Socket mode run error

Hi internet

I have been trying to figure the problem with my code that is throwing the below error

socket mode run error. WebSocketError(Protocol(ResetWithoutClosingHandshake))

with the following stack trace

and it only happens after 5-7days of running in my server

I have handle a case to retry the connection

use crate::apps::connections_open::connections_open;
use crate::error::Error;
use crate::http_client::SlackWebAPIClient;
use crate::socket::event::{
    AcknowledgeMessage, DisconnectEvent, DisconnectReason, EventsAPI, HelloEvent, InteractiveEvent,
    SlashCommandsEvent, SocketModeEvent,
};
use async_std::fs::read;
use async_std::net::TcpStream;
use async_tls::client::TlsStream;
use async_tls::TlsConnector;
use async_trait::async_trait;
use async_tungstenite::tungstenite::Message;
use async_tungstenite::{client_async, WebSocketStream};
use futures_util::{SinkExt, StreamExt};
use rustls::ClientConfig;
use std::collections::HashMap;
use std::io::Cursor;
use std::sync::Arc;
use url::Url;

pub type Stream = WebSocketStream<TlsStream<TcpStream>>;

/// Implement this trait in your code to handle slack events.
#[allow(unused_variables)]
#[async_trait]
pub trait EventHandler<S>: Send
where
    S: SlackWebAPIClient,
{
    async fn on_close(&mut self, socket_mode: &SocketMode<S>) {
        log::info!("websocket close");
    }
    async fn on_connect(&mut self, socket_mode: &SocketMode<S>) {
        log::info!("websocket connect");
    }
    async fn on_hello(&mut self, socket_mode: &SocketMode<S>, e: HelloEvent, s: &mut Stream) {
        log::info!("hello event: {:?}", e);
    }
    async fn on_disconnect(
        &mut self,
        socket_mode: &SocketMode<S>,
        e: DisconnectEvent,
        s: &mut Stream,
    ) {
        log::info!("disconnect event: {:?}", e);
    }
    async fn on_events_api(&mut self, socket_mode: &SocketMode<S>, e: EventsAPI, s: &mut Stream) {
        log::info!("events api event: {:?}", e);
    }
    async fn on_interactive(
        &mut self,
        socket_mode: &SocketMode<S>,
        e: InteractiveEvent,
        s: &mut Stream,
    ) {
        log::info!("interactive event: {:?}", e);
    }
    async fn on_slash_commands(
        &mut self,
        socket_mode: &SocketMode<S>,
        e: SlashCommandsEvent,
        s: &mut Stream,
    ) {
        log::info!("slash commands event: {:?}", e);
    }
}

/// The socket mode client.
pub struct SocketMode<S>
where
    S: SlackWebAPIClient,
{
    pub api_client: S,
    pub app_token: String,
    pub bot_token: String,
    pub option_parameter: HashMap<String, String>,
    pub web_socket_port: u16,
    pub ca_file_path: Option<String>,
}

impl<S> SocketMode<S>
where
    S: SlackWebAPIClient,
{
    pub fn new(api_client: S, app_token: String, bot_token: String) -> Self {
        SocketMode {
            api_client,
            app_token,
            bot_token,
            option_parameter: HashMap::new(),
            web_socket_port: 443,
            ca_file_path: None,
        }
    }
    pub fn option_parameter(mut self, key: String, value: String) -> Self {
        self.option_parameter.insert(key, value);
        self
    }
    pub fn web_socket_port(mut self, port: u16) -> Self {
        self.web_socket_port = port;
        self
    }
    pub fn ca_file_path(mut self, ca_file_path: String) -> Self {
        self.ca_file_path = Some(ca_file_path);
        self
    }
    /// Run slack and websocket communication.
    pub async fn run<T>(&mut self, handler: &mut T) -> Result<(), Error>
    where
        T: EventHandler<S>,
    {
        let mut ping_count = 0;
        let mut ws = self.get_ws().await?;
        handler.on_connect(self).await;

        loop {
            let message = ws.next().await.ok_or(Error::NotFoundStream)?;

            match message? {
                Message::Text(t) => {
                    log::info!("Message::Text {}", t);
                    let event = serde_json::from_str::<SocketModeEvent>(&t)?;
                    match event {
                        SocketModeEvent::HelloEvent(e) => handler.on_hello(&self, e, &mut ws).await,
                        SocketModeEvent::DisconnectEvent(e) => {
                            if e.reason == DisconnectReason::RefreshRequested {
                                log::info!("Reconnecting to slack socket server");
                                ws = self.get_ws().await?;
                                handler.on_connect(self).await;
                            } else {
                                handler.on_disconnect(self, e, &mut ws).await;
                            }
                        }
                        SocketModeEvent::EventsAPI(e) => {
                            handler.on_events_api(self, e, &mut ws).await
                        }
                        SocketModeEvent::InteractiveEvent(e) => {
                            handler.on_interactive(self, e, &mut ws).await
                        }
                        SocketModeEvent::SlashCommandsEvent(e) => {
                            handler.on_slash_commands(self, e, &mut ws).await
                        }
                    }
                }
                Message::Ping(p) => {
                    log::info!("ping: {:?}", p);
                    if ping_count > 200 {
                        log::info!("Reconnecting to slack socket server {}", ping_count);
                        ws = self.get_ws().await?;
                        ping_count = 0;
                    } else {
                        ping_count += 1;
                    }
                }
                Message::Close(_) => {
                    handler.on_close(self).await;
                    break;
                }
                m => log::warn!("unsupported web socket message: {:?}", m),
            }
        }
        Ok(())
    }
    pub async fn get_ws(&self) -> Result<WebSocketStream<TlsStream<TcpStream>>, Error> {
        let response = connections_open(&self.api_client, &self.app_token).await?;
        let ws_url = response.url.ok_or(Error::SocketModeOpenConnectionError)?;
        let ws_url_parsed = Url::parse(&ws_url)?;
        let ws_domain = ws_url_parsed.domain().ok_or(Error::NotFoundDomain)?;

        let tcp_stream = TcpStream::connect((ws_domain, self.web_socket_port)).await?;
        let connector = if let Some(ca_file_path) = &self.ca_file_path {
            connector_for_ca_file(ca_file_path).await?
        } else {
            TlsConnector::default()
        };
        let tls_stream = connector.connect(ws_domain, tcp_stream).await?;

        let (ws, _) = client_async(&ws_url, tls_stream).await?;

        Ok(ws)
    }
}

pub async fn ack(envelope_id: &str, stream: &mut Stream) -> Result<(), Error> {
    let json = serde_json::to_string(&AcknowledgeMessage { envelope_id })?;
    stream
        .send(Message::Text(json))
        .await
        .map_err(Error::WebSocketError)
}

pub async fn connector_for_ca_file(ca_file_path: &str) -> Result<TlsConnector, Error> {
    let mut config = ClientConfig::new();
    let file = read(ca_file_path).await?;
    let mut pem = Cursor::new(file);
    config
        .root_store
        .add_pem_file(&mut pem)
        .map_err(|_| Error::InvalidInputError)?;
    Ok(TlsConnector::from(Arc::new(config)))
}

any tip would be appreciated thanks!!!

What have you done to try and reproduce the error? Where do you think the problem might lie? Please share the investigation you have done so far because right now your request is a complete job that would normally come with compensation.

I have tried to redeploy my code to check and the error can always be reproducible after 1 week on running the websocket connect.

Up to now I wasnt able to pin point where might the problem be. I have tried adding the reconnection login in the Close event but the same error happened as well.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.