Hi internet
I have been trying to figure the problem with my code that is throwing the below error
socket mode run error. WebSocketError(Protocol(ResetWithoutClosingHandshake))
with the following stack trace
and it only happens after 5-7days of running in my server
I have handle a case to retry the connection
use crate::apps::connections_open::connections_open;
use crate::error::Error;
use crate::http_client::SlackWebAPIClient;
use crate::socket::event::{
AcknowledgeMessage, DisconnectEvent, DisconnectReason, EventsAPI, HelloEvent, InteractiveEvent,
SlashCommandsEvent, SocketModeEvent,
};
use async_std::fs::read;
use async_std::net::TcpStream;
use async_tls::client::TlsStream;
use async_tls::TlsConnector;
use async_trait::async_trait;
use async_tungstenite::tungstenite::Message;
use async_tungstenite::{client_async, WebSocketStream};
use futures_util::{SinkExt, StreamExt};
use rustls::ClientConfig;
use std::collections::HashMap;
use std::io::Cursor;
use std::sync::Arc;
use url::Url;
pub type Stream = WebSocketStream<TlsStream<TcpStream>>;
/// Implement this trait in your code to handle slack events.
#[allow(unused_variables)]
#[async_trait]
pub trait EventHandler<S>: Send
where
S: SlackWebAPIClient,
{
async fn on_close(&mut self, socket_mode: &SocketMode<S>) {
log::info!("websocket close");
}
async fn on_connect(&mut self, socket_mode: &SocketMode<S>) {
log::info!("websocket connect");
}
async fn on_hello(&mut self, socket_mode: &SocketMode<S>, e: HelloEvent, s: &mut Stream) {
log::info!("hello event: {:?}", e);
}
async fn on_disconnect(
&mut self,
socket_mode: &SocketMode<S>,
e: DisconnectEvent,
s: &mut Stream,
) {
log::info!("disconnect event: {:?}", e);
}
async fn on_events_api(&mut self, socket_mode: &SocketMode<S>, e: EventsAPI, s: &mut Stream) {
log::info!("events api event: {:?}", e);
}
async fn on_interactive(
&mut self,
socket_mode: &SocketMode<S>,
e: InteractiveEvent,
s: &mut Stream,
) {
log::info!("interactive event: {:?}", e);
}
async fn on_slash_commands(
&mut self,
socket_mode: &SocketMode<S>,
e: SlashCommandsEvent,
s: &mut Stream,
) {
log::info!("slash commands event: {:?}", e);
}
}
/// The socket mode client.
pub struct SocketMode<S>
where
S: SlackWebAPIClient,
{
pub api_client: S,
pub app_token: String,
pub bot_token: String,
pub option_parameter: HashMap<String, String>,
pub web_socket_port: u16,
pub ca_file_path: Option<String>,
}
impl<S> SocketMode<S>
where
S: SlackWebAPIClient,
{
pub fn new(api_client: S, app_token: String, bot_token: String) -> Self {
SocketMode {
api_client,
app_token,
bot_token,
option_parameter: HashMap::new(),
web_socket_port: 443,
ca_file_path: None,
}
}
pub fn option_parameter(mut self, key: String, value: String) -> Self {
self.option_parameter.insert(key, value);
self
}
pub fn web_socket_port(mut self, port: u16) -> Self {
self.web_socket_port = port;
self
}
pub fn ca_file_path(mut self, ca_file_path: String) -> Self {
self.ca_file_path = Some(ca_file_path);
self
}
/// Run slack and websocket communication.
pub async fn run<T>(&mut self, handler: &mut T) -> Result<(), Error>
where
T: EventHandler<S>,
{
let mut ping_count = 0;
let mut ws = self.get_ws().await?;
handler.on_connect(self).await;
loop {
let message = ws.next().await.ok_or(Error::NotFoundStream)?;
match message? {
Message::Text(t) => {
log::info!("Message::Text {}", t);
let event = serde_json::from_str::<SocketModeEvent>(&t)?;
match event {
SocketModeEvent::HelloEvent(e) => handler.on_hello(&self, e, &mut ws).await,
SocketModeEvent::DisconnectEvent(e) => {
if e.reason == DisconnectReason::RefreshRequested {
log::info!("Reconnecting to slack socket server");
ws = self.get_ws().await?;
handler.on_connect(self).await;
} else {
handler.on_disconnect(self, e, &mut ws).await;
}
}
SocketModeEvent::EventsAPI(e) => {
handler.on_events_api(self, e, &mut ws).await
}
SocketModeEvent::InteractiveEvent(e) => {
handler.on_interactive(self, e, &mut ws).await
}
SocketModeEvent::SlashCommandsEvent(e) => {
handler.on_slash_commands(self, e, &mut ws).await
}
}
}
Message::Ping(p) => {
log::info!("ping: {:?}", p);
if ping_count > 200 {
log::info!("Reconnecting to slack socket server {}", ping_count);
ws = self.get_ws().await?;
ping_count = 0;
} else {
ping_count += 1;
}
}
Message::Close(_) => {
handler.on_close(self).await;
break;
}
m => log::warn!("unsupported web socket message: {:?}", m),
}
}
Ok(())
}
pub async fn get_ws(&self) -> Result<WebSocketStream<TlsStream<TcpStream>>, Error> {
let response = connections_open(&self.api_client, &self.app_token).await?;
let ws_url = response.url.ok_or(Error::SocketModeOpenConnectionError)?;
let ws_url_parsed = Url::parse(&ws_url)?;
let ws_domain = ws_url_parsed.domain().ok_or(Error::NotFoundDomain)?;
let tcp_stream = TcpStream::connect((ws_domain, self.web_socket_port)).await?;
let connector = if let Some(ca_file_path) = &self.ca_file_path {
connector_for_ca_file(ca_file_path).await?
} else {
TlsConnector::default()
};
let tls_stream = connector.connect(ws_domain, tcp_stream).await?;
let (ws, _) = client_async(&ws_url, tls_stream).await?;
Ok(ws)
}
}
pub async fn ack(envelope_id: &str, stream: &mut Stream) -> Result<(), Error> {
let json = serde_json::to_string(&AcknowledgeMessage { envelope_id })?;
stream
.send(Message::Text(json))
.await
.map_err(Error::WebSocketError)
}
pub async fn connector_for_ca_file(ca_file_path: &str) -> Result<TlsConnector, Error> {
let mut config = ClientConfig::new();
let file = read(ca_file_path).await?;
let mut pem = Cursor::new(file);
config
.root_store
.add_pem_file(&mut pem)
.map_err(|_| Error::InvalidInputError)?;
Ok(TlsConnector::from(Arc::new(config)))
}
any tip would be appreciated thanks!!!