Advice for better decoupled code

hi,

I'm toying around with async and have this websocket ping pong request tasks that just checks if the connection is healthy.

So I ended up using task::spawn and tokio channels for messaging between them.

The issue I'm having is, that these tasks, accumulate many lines of code and I don't know how to make the code more decoupled. I see many libraries have so many small functions and almost nothing goes beyond 10 lines of code.

Is there a way to improve this code in such a way?

use async_tungstenite::{
    stream::Stream,
    tokio::TokioAdapter,
    tungstenite::{
        protocol::{frame::coding::CloseCode, CloseFrame},
        Message,
    },
    WebSocketStream,
};
use futures::SinkExt;
use futures::{
    channel::mpsc::UnboundedReceiver,
    stream::{SplitSink, StreamExt},
};
use tokio::net::TcpStream;
use tokio_native_tls::TlsStream;

// =======================================================================================================================

#[derive(Copy, Clone, Debug)]
enum ConnectionStatus {
    None,
    Connecting,
    Connected,
    Reconnecting,
    Closing,
    Closed,
    Error,
}

struct Subscription;

struct Connection {
    id: String,
    subscriptions: Vec<Subscription>,
    connection_attempts: std::sync::atomic::AtomicU8,
    enabled: std::sync::atomic::AtomicBool,
    message_tx: Option<tokio::sync::mpsc::UnboundedSender<async_tungstenite::tungstenite::Message>>,
    // status: std::sync::Arc<tokio::sync::RwLock<ConnectionStatus>>,
    // sender: Option<SplitSink<WebSocketStream<Stream<TokioAdapter<TcpStream>, TokioAdapter<TlsStream<TcpStream>>>>,Message,>,>,
    // request_buffer: Option<tokio::sync::mpsc::UnboundedSender<String>>,
    // watch: tokio::sync::watch::Receiver<String>,
}

impl Connection {
    fn new() -> Self {
        Connection {
            id: uuid::Uuid::new_v4().to_string(),
            subscriptions: Vec::new(),
            connection_attempts: std::sync::atomic::AtomicU8::new(0),
            enabled: std::sync::atomic::AtomicBool::new(false),
            message_tx: None,
        }
    }

    async fn connect(&mut self, uri: impl Into<String>) -> Result<(), ()> {
        if let Ok((stream, response)) = async_tungstenite::tokio::connect_async(uri.into()).await {
            let async_tungstenite::tungstenite::http::StatusCode::SWITCHING_PROTOCOLS = response.status() else {
                return Err(());
            };

            let (mut stream_tx, stream_rx) = stream.split();
            let (status_tx, mut status_rx) = tokio::sync::watch::channel(ConnectionStatus::None);
            let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel::<async_tungstenite::tungstenite::Message>();

            self.message_tx = Some(message_tx);

            // # send
            tokio::spawn({
                let status_tx = status_tx.clone();
                async move {
                    while let Some(message) = message_rx.recv().await {
                        if let Err(error) = stream_tx.send(message).await {
                            if let Err(error) = status_tx.send(ConnectionStatus::Error) {
                                todo!();
                                break
                            }
                            break
                        }
                    }
                }
            });

            let Connection { message_tx, .. } = self;

            // # keepalive
            tokio::spawn({

                let status_tx = status_tx.clone();
                let message_tx = message_tx.clone();

                async move {
                    loop {
                        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;

                        if let Some(ref message_tx) = message_tx {

                            let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis().to_string();
                            if let Err(error) =  message_tx.send(async_tungstenite::tungstenite::Message::Ping(timestamp.as_bytes().to_vec())) {

                                todo!();
                            }
                        }

                        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;

                        match status_rx.has_changed() {
                            Ok(changed) => if changed == false { break },
                            Err(_) => todo!(),
                        }

                        match *status_rx.borrow_and_update() {
                            ConnectionStatus::Closed | ConnectionStatus::Closing | ConnectionStatus::Error => break,
                            _ => (),
                        }
                    }
                }
            });

            // # stream
            tokio::spawn(async move {
                stream_rx
                    .for_each(|message| async {
                        match message {
                            Ok(message) => match message {
                                Message::Text(message) => todo!(), // to the global message router
                                Message::Binary(vec) => todo!(),
                                Message::Ping(vec) => {
                                    if let Err(error) = status_tx.send(ConnectionStatus::Connected) {
                                        todo!();
                                    }
                                }
                                Message::Pong(vec) => {
                                    if let Err(error) = status_tx.send(ConnectionStatus::Connected) {
                                        todo!();
                                    }
                                }
                                Message::Close(close_frame) => {
                                    if let Some(CloseFrame { reason, code }) = close_frame {
                                        dbg!(reason);

                                        match code {
                                            CloseCode::Normal => todo!(),
                                            CloseCode::Away => todo!(),
                                            CloseCode::Protocol => todo!(),
                                            CloseCode::Unsupported => todo!(),
                                            CloseCode::Status => todo!(),
                                            CloseCode::Abnormal => todo!(),
                                            CloseCode::Invalid => todo!(),
                                            CloseCode::Policy => todo!(),
                                            CloseCode::Size => todo!(),
                                            CloseCode::Extension => todo!(),
                                            CloseCode::Error => todo!(),
                                            CloseCode::Restart => todo!(),
                                            CloseCode::Again => todo!(),
                                            _ => todo!(),
                                        }
                                    }

                                    if let Err(error) = status_tx.send(ConnectionStatus::Closed) {
                                        todo!();
                                    }
                                }
                                Message::Frame(frame) => todo!(),
                            },
                            Err(error) => {
                                dbg!(error);
                            }
                        }
                    })
                    .await
            }); // .await;

            return Ok(());
        }

        Err(())
    }

Look for cohesive chunks of code and split them into functions. How you do it will depend on your domain logic and the amount and complexity of the code involved.

The average library function is intended to do a very specific or very general thing, so that as a user of the library, you can assemble its functions into doing what you want. Libraries will inherently tend to have many small functions (and may or may not have fewer large functions). You should not take this out of context as being the only kind of good code.

That said, your code is pretty lengthy and deeply nested, so — as a matter of code style, not reusability — I would recommend having a few more functions than you currently do. What I would suggest trying is taking each of the three async move {} blocks you spawn(), and making it an async fn instead. This might look like:

        ...
        tokio::spawn(keepalive_task(status_rx, message_tx.clone()))
        ...
    }
}

async fn keepalive_task(
    status_rx: watch::Receiver<ConnectionStatus>,
    message_tx: mpsc::Sender<async_tungstenite::tungstenite::Message>,
) {
    loop {
        ...

These functions are not intended to be reusable elsewhere — only to divide the code into more individually digestible chunks. But if you make a habit of finding opportunity for such subdivision where you can, sometimes you’ll notice that one can be more generally useful. Don’t force it and overcomplicate things.

3 Likes

Speaking of nesting...

I often prefer let else over if let for early returns to reduce nesting. There's a prime example here.

async fn connect(&mut self, uri: impl Into<String>) -> Result<(), ()> {
    if let Ok((stream, response)) = ... {
        // Everything is nested :(
    }
    Err(())
}
async fn connect(&mut self, uri: impl Into<String>) -> Result<(), ()> {
    let Ok((stream, response)) = ... else {
        // Early return
        return Err(())
    }
    // Everything is un-nested :)
}
3 Likes