hi,
I'm toying around with async and have this websocket ping pong request tasks that just checks if the connection is healthy.
So I ended up using task::spawn and tokio channels for messaging between them.
The issue I'm having is, that these tasks, accumulate many lines of code and I don't know how to make the code more decoupled. I see many libraries have so many small functions and almost nothing goes beyond 10 lines of code.
Is there a way to improve this code in such a way?
use async_tungstenite::{
stream::Stream,
tokio::TokioAdapter,
tungstenite::{
protocol::{frame::coding::CloseCode, CloseFrame},
Message,
},
WebSocketStream,
};
use futures::SinkExt;
use futures::{
channel::mpsc::UnboundedReceiver,
stream::{SplitSink, StreamExt},
};
use tokio::net::TcpStream;
use tokio_native_tls::TlsStream;
// =======================================================================================================================
#[derive(Copy, Clone, Debug)]
enum ConnectionStatus {
None,
Connecting,
Connected,
Reconnecting,
Closing,
Closed,
Error,
}
struct Subscription;
struct Connection {
id: String,
subscriptions: Vec<Subscription>,
connection_attempts: std::sync::atomic::AtomicU8,
enabled: std::sync::atomic::AtomicBool,
message_tx: Option<tokio::sync::mpsc::UnboundedSender<async_tungstenite::tungstenite::Message>>,
// status: std::sync::Arc<tokio::sync::RwLock<ConnectionStatus>>,
// sender: Option<SplitSink<WebSocketStream<Stream<TokioAdapter<TcpStream>, TokioAdapter<TlsStream<TcpStream>>>>,Message,>,>,
// request_buffer: Option<tokio::sync::mpsc::UnboundedSender<String>>,
// watch: tokio::sync::watch::Receiver<String>,
}
impl Connection {
fn new() -> Self {
Connection {
id: uuid::Uuid::new_v4().to_string(),
subscriptions: Vec::new(),
connection_attempts: std::sync::atomic::AtomicU8::new(0),
enabled: std::sync::atomic::AtomicBool::new(false),
message_tx: None,
}
}
async fn connect(&mut self, uri: impl Into<String>) -> Result<(), ()> {
if let Ok((stream, response)) = async_tungstenite::tokio::connect_async(uri.into()).await {
let async_tungstenite::tungstenite::http::StatusCode::SWITCHING_PROTOCOLS = response.status() else {
return Err(());
};
let (mut stream_tx, stream_rx) = stream.split();
let (status_tx, mut status_rx) = tokio::sync::watch::channel(ConnectionStatus::None);
let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel::<async_tungstenite::tungstenite::Message>();
self.message_tx = Some(message_tx);
// # send
tokio::spawn({
let status_tx = status_tx.clone();
async move {
while let Some(message) = message_rx.recv().await {
if let Err(error) = stream_tx.send(message).await {
if let Err(error) = status_tx.send(ConnectionStatus::Error) {
todo!();
break
}
break
}
}
}
});
let Connection { message_tx, .. } = self;
// # keepalive
tokio::spawn({
let status_tx = status_tx.clone();
let message_tx = message_tx.clone();
async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
if let Some(ref message_tx) = message_tx {
let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis().to_string();
if let Err(error) = message_tx.send(async_tungstenite::tungstenite::Message::Ping(timestamp.as_bytes().to_vec())) {
todo!();
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
match status_rx.has_changed() {
Ok(changed) => if changed == false { break },
Err(_) => todo!(),
}
match *status_rx.borrow_and_update() {
ConnectionStatus::Closed | ConnectionStatus::Closing | ConnectionStatus::Error => break,
_ => (),
}
}
}
});
// # stream
tokio::spawn(async move {
stream_rx
.for_each(|message| async {
match message {
Ok(message) => match message {
Message::Text(message) => todo!(), // to the global message router
Message::Binary(vec) => todo!(),
Message::Ping(vec) => {
if let Err(error) = status_tx.send(ConnectionStatus::Connected) {
todo!();
}
}
Message::Pong(vec) => {
if let Err(error) = status_tx.send(ConnectionStatus::Connected) {
todo!();
}
}
Message::Close(close_frame) => {
if let Some(CloseFrame { reason, code }) = close_frame {
dbg!(reason);
match code {
CloseCode::Normal => todo!(),
CloseCode::Away => todo!(),
CloseCode::Protocol => todo!(),
CloseCode::Unsupported => todo!(),
CloseCode::Status => todo!(),
CloseCode::Abnormal => todo!(),
CloseCode::Invalid => todo!(),
CloseCode::Policy => todo!(),
CloseCode::Size => todo!(),
CloseCode::Extension => todo!(),
CloseCode::Error => todo!(),
CloseCode::Restart => todo!(),
CloseCode::Again => todo!(),
_ => todo!(),
}
}
if let Err(error) = status_tx.send(ConnectionStatus::Closed) {
todo!();
}
}
Message::Frame(frame) => todo!(),
},
Err(error) => {
dbg!(error);
}
}
})
.await
}); // .await;
return Ok(());
}
Err(())
}