At the beginning, it is still possible, but after running for a period of time, receiving client text will be slow

use futures_util::{stream::SplitSink, StreamExt, SinkExt};
use rand::Rng;
use tokio_tungstenite::{tungstenite::{http::StatusCode, handshake::server::Request}, WebSocketStream};
use dashmap::DashMap;
use tokio::{sync::{broadcast::{Sender, Receiver, self}, oneshot, mpsc}, time, net::{TcpStream, TcpListener}};
use tokio_tungstenite::tungstenite::{Message, handshake::server::Response};
use lazy_static::lazy_static;

use std::{io::Error as IoError, net::SocketAddr};
use std::panic;


lazy_static!{
    pub static ref SOCKET_LIST:DashMap<String, SplitSink<WebSocketStream<tokio::net::TcpStream>, tokio_tungstenite::tungstenite::Message>> = DashMap::new();

}



async fn handle_connection(addr: SocketAddr, raw_stream: TcpStream) {

    let resp = Response::builder()
    .status(StatusCode::UNAUTHORIZED)
    .body(Some("token not exist".into()))
    .unwrap();
    let ws_stream =
    tokio_tungstenite::accept_hdr_async(raw_stream, |req: &Request, response: Response| {
        match req.uri().query() {
            None => {
                return Err(resp);
            }
            Some(query) => {
                let t = query.split("=").collect::<Vec<&str>>();
            }
        }
        Ok(response)
    })
    .await;

    match ws_stream {
        Ok(mut ws_stream) => {
            if let Some(ms) = ws_stream.next().await {
                match ms {
                    Ok(msg)=>{
                        if msg.is_text() {
                            let mut ms = msg.into_text();
                            if let Ok(ms) = ms {
                                loginValue(ws_stream, ms).await;
                            }
                        }
                    }
                    Err(_) => {}
                }
            }
        }
        Err(_) => {}
    }
}

pub async fn loginValue(socket:WebSocketStream<TcpStream>,  msg:String){
    if msg.eq("ok") {
        let mut ws_sender = socket;
        if ws_sender.send(Message::Text("ok".to_string())).await.is_ok(){
            let (mut sender, mut receiver) = ws_sender.split();
            tokio::spawn(async move{
                while let Some(data) = receiver.next().await{
                    match data {
                        Ok(rdata) => {
                            match rdata {
                                Message::Text(msg) =>{
                                    let result: Result<(), _> = panic::catch_unwind(|| { 
                                        sendMessage(msg);
                                    }); 
                                    match result { 
                                        Ok(()) => {},
                                        Err(_) => {break;}
                                    }
                                }
                                Message::Close(closefame) =>{
                                    break;
                                }
                                _=>{
                                    break;
                                }
                            }
                        }
                        Err(v) => {
                            break;
                        }
                    }
                }
            });

            SOCKET_LIST.insert("test01".to_string(), sender);
            logicRun().await;
        }
    }
}

pub async fn logicRun(){


    tokio::spawn(async move{

        loop {
            let tid = rand::thread_rng().gen_range(1..3);
            if tid == 1 {
                sendMessage("784548s421545sdfsdf78712154871512124578121sdf15ser8we7f4ds51f215x4fv5s7f8s5dzxjcivuaodjfaoduifojaodjfoaljdsofjaadsjoijeiojfzldjoajufieojaoifjdknvzlknmojdfiaojldmlfajeoijfaojdfoajueoijrfoaueifjaoufjoidjsofejuoajdfklnzlnvoajfojeoauirnfdoajfioejaourojfonclzjfoauwweoijfaodjfiajeofodjsofaeofjdaoufoinvzjokfjoawjefojoeuinkvnzljediaojfaoefjoajdjfoefjodajfinlzjvoajeioajfojeoifjaiuroanvzjeoifujaoifjaou".to_string()).await;
            }
            sendMessage("784548s421545sdfsdf78712154871512124578121sdf15ser8we7f4ds51f215x4fv5s7f8s5d1v2x4d54f8w7e84f5s1sdjiusj,.m,jkhjijskdlknn,muijkojiojlkjjjsdoifjwiojfkladsdjfoawefjodsajfknxvklzjiekjfoajdfkajweofjkldsjfoajfjdosfjewoafjdoskljfojfaosjeofjodsjfaojoejfoajdofjoaeofjadosjfoefjodjsfoklnvlknzlfnmoajeoijfaojfeofjoajfkldjfaojfeojidojaofjeioajfdijflkadjklnvozjdoifajefjaojeoifjakldfjioaejfiodjsoafjeoajfdloja".to_string()).await;
            sendMessage("784548s421545sdfsdf78712154871512124578121sdf15ser8we7f4ds51f215x4fv5s7f8s5d1v2x4d54f8w7e84f5s1sdjiusj,.m,jkhjijskdlknn,m,jksuijnkfnm,snd,mfksdjkjknkkjzjkfjsdiui.m,mkjknk,m,jknm,nm,nkjhkjlujiljkjklhkjkjlkjlsdf,,m,nn,jkljiojklhljn,nm...,.klkjljihihsoiuoifjdklsjfidoskfajsdofusiodjflsdjufoiusoidjfklsjudiofjskdlfusiodjfklsduoiureijflsjdojufiojsodujfoiwejfsljdfiosjuieorjfsdljfiowuerfojdsofjowieurj".to_string()).await;
            time::sleep(time::Duration::from_millis(100)).await;
        }
    });
}

pub async fn sendMessage(val:String){

    tokio::spawn(async move{
        let mut ssender = SOCKET_LIST.get_mut("test01");
        let mut userSender = ssender.as_mut().unwrap().value_mut();
        if userSender.send(tokio_tungstenite::tungstenite::Message::Text(val)).await.is_ok() {
            println!("send message...ok");
        };
    });
}



#[tokio::main]
async fn main() -> Result<(), IoError>{

    let addr = "0.0.0.0:3000".to_string();
    let try_socket = TcpListener::bind(&addr).await;
    let listener = try_socket.expect("bind failed");
    println!("start success");
    while let Ok((stream, addr)) = listener.accept().await {
        tokio::spawn(handle_connection(addr, stream));
    }

    Ok(())
}