Check if SplitSink connection is open?

I have the following code which makes a connection with a websocket API and splits the websocket into a sender and reader.

        // Make websocket stream to API endpoint for making orders.
        let url = url::Url::parse(api.get_urls("ws_api_url").unwrap()).unwrap();
        let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
        let (mut send, mut read) = ws_stream.split();
        
       
        // Start websocket API of exchange and pass the transmitter as parameter
        // so the incoming data is send to the algorithm.
        tokio::spawn(async move {
            while let Some(message) = read.next().await {
                let data = message.unwrap().into_data();
                let data = match String::from_utf8(data) {
                    Ok(d) => d,
                    Err(e) => {
                        continue;
                    }
                };
            }
        }); 
        

        let send = Arc::new(Mutex::new(send));
       
        // Retrieve algorithm by id and start.
        let mut algorithm = TradeAlgorithm::get(algo_id.to_string(), psql.clone()).await.unwrap();
        let interval = algorithm.interval.to_string();
        let process_handle = match algorithm.start(psql, rx, api.clone(), send).await {
            Ok(ph) => ph,
            Err(e) => {
                eprintln!("Error 5 {}", e);
                return Ok(Routes::internal_server_error().await);
            }
        };

        // Start websocket API of exchange and pass the transmitter as parameter
        // so the incoming data is send to the algorithm.
        tokio::spawn(async move {
            match api.ws_kline(tx, interval).await {
                Ok(_) => (),
                Err(e) => {
                    eprintln!("{}", e);
                }
            }
        }); 

As you can see the sender gets passed as parameter deeper into my code. The sender gets passed to different methods and different threads.

The problem is that the websocket connection can close for some unexpected reason. I want to check if the connection is closed.

The code is too complex to check it when actually sending data, so I thought about making a separate thread to check if send can still send data:

        // Make websocket stream to API endpoint for making orders.
        let url = url::Url::parse(api.get_urls("ws_api_url").unwrap()).unwrap();
        let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
        let (mut send, mut read) = ws_stream.split();
        
       
        // Start websocket API of exchange and pass the transmitter as parameter
        // so the incoming data is send to the algorithm.
        tokio::spawn(async move {
            while let Some(message) = read.next().await {
                let data = message.unwrap().into_data();
                let data = match String::from_utf8(data) {
                    Ok(d) => d,
                    Err(e) => {
                        continue;
                    }
                };
            }
        }); 
        

        let send = Arc::new(Mutex::new(send));
       
        // Retrieve algorithm by id and start.
        let mut algorithm = TradeAlgorithm::get(algo_id.to_string(), psql.clone()).await.unwrap();
        let interval = algorithm.interval.to_string();
        let process_handle = match algorithm.start(psql, rx, api.clone(), send).await {
            Ok(ph) => ph,
            Err(e) => {
                eprintln!("Error 5 {}", e);
                return Ok(Routes::internal_server_error().await);
            }
        };

        // Start websocket API of exchange and pass the transmitter as parameter
        // so the incoming data is send to the algorithm.
        tokio::spawn(async move {
            match api.ws_kline(tx, interval).await {
                Ok(_) => (),
                Err(e) => {
                    eprintln!("{}", e);
                }
            }
        }); 

This however results in runtime errors indicating the websocket is closed, I guess because there is a lock on send.

My question: How to check in a different thread if the websocket connection is still open?

I currently have this solution:

    // Start a algorithm.
    pub async fn start_algorithm(&self, req: http::Http, psql: Psql, api: Api) -> Result<http::HttpResponse, http::Error> {

        // Retrieve algorithm ID from URL-paremeter.
        let algo_id = match req.params.get("id").cloned() {
            Some(id) => id,
            None => {
                return Ok(Routes::not_found().await);
            }
        };

        // Check if algorithm belongs to user doing request.
        match req.headers.get("session_token") {
            Some(token) => {
                match http::validate_session_token(token, http::DBTable::Algorithm(&*algo_id), psql.clone()).await {
                    Ok(v) => {
                        if !v {
                            return Ok(Routes::unauthorized().await);
                        }
                    },
                    Err(_) => {
                        return Ok(Routes::internal_server_error().await);
                    }
                }
            },
            None => {
                return Ok(Routes::unauthorized().await);
            }
        }
       
        #[async_recursion]
        async fn start_algorithm_recursive(algo_id: &str, psql: Psql, api: Api) -> Result<http::HttpResponse, http::Error> {
            // Make a transmitter and receiver. The receiver is passed to the algorithm when started
            // so it can receive the data from the transmitter which we pass to the exchange API.
            let (tx, rx) = mpsc::channel::<CandleStick>(10);
            let tx = Arc::new(Mutex::new(tx));
            let rx = Arc::new(Mutex::new(rx));
           
            // Make websocket stream to API endpoint for making orders.
            let url = url::Url::parse(api.get_urls("ws_api_url").unwrap()).unwrap();
            let (ws_stream, _) = connect_async(url.clone()).await.expect("Failed to connect");
            let (send, _) = ws_stream.split();
            
            let send = Arc::new(Mutex::new(send));

            // Retrieve algorithm by id and start.
            let algorithm = TradeAlgorithm::get(algo_id.to_string(), psql.clone()).await.unwrap();
            let interval = algorithm.interval.to_string();
            let (process_handle, thread_a_handle, thread_b_handle) = match algorithm.start(psql.clone(), rx, api.clone(), send).await {
                Ok((ph, th_a, th_b)) => (ph, th_a, th_b),
                Err(e) => {
                    eprintln!("Error 5 {}", e);
                    return Ok(Routes::internal_server_error().await);
                }
            };

            // Start websocket API of exchange and pass the transmitter as parameter
            // so the incoming data is send to the algorithm.
            let api_clone= api.clone();
            tokio::spawn(async move {
                match api_clone.ws_kline(tx, interval).await {
                    Ok(_) => (),
                    Err(e) => {
                        eprintln!("{}", e);
                    }
                }
            }); 


            // Add process handle to process-handle list so we can stop algorithm later.
            let mut shared_process_handles = tradealgorithm::PROCESS_HANDLES.lock().await;
            shared_process_handles.insert(algo_id.into(), process_handle);

            if let Err(e) = tokio::try_join!(thread_a_handle, thread_b_handle) {
                eprintln!("One of the tasks panicked: {:?}", e);
                start_algorithm_recursive(algo_id, psql.clone(), api.clone()).await?;
            }
        
            // Forge and return response.
            let response = http::HttpResponse {
                status: 200,
                headers: vec![
                    ("Content-Type".into(), "text/plain".into()),
                ],
                body: "Algorithm started".into(),
            };

            Ok(response)
        }

        start_algorithm_recursive(&*algo_id, psql, api).await
    }

I basically made a inner function. When a thread panics the recursive function is called again resulting in the websocket being reconnected.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.