Tokio channels and shared state

Hello,

I'm currently writing a client application that communicates with my server over TCP via my own little protocol.

Currently, the client handles the outgoing packets in a separate task and receives the packets it has to send via a mpsc channel.

As it's currently implemented, the client would freeze after a while if there's no connection to the server due to the mpsc channel buffer being full.

An unbounded channel would just result in the buffer filling up until my system runs out of memory.

I would like to avoid this and keep my client running even when there's no connection to a server.
Of course, the client wouldn't be able to send its packets, but at least it would be able to offer functionality that doesn't rely on a connection.

One idea I had to solve this issue was to use an AtomicBool that I would wrap in an Arc.
This boolean should tell the client application if there's currently a connection or not, and if there isn't, then the client should not send new packets into the channel.

Even though this works, I would like to hear your thoughts on this approach and if you would use/prefer a different kind of solution.

Here's a little sample to demonstrate what I'm currently facing, without the AtomicBool.

Thank you very much.

Kind regards,

nurturee

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let (tx, mut rx) = mpsc::channel(30);

    tokio::spawn(async move {
        loop {
            let stream = TcpStream::connect(("127.0.0.1", 8080)).await;

            if let Ok(mut stream) = stream {
                loop {
                    tokio::select! {
                        Some(packet) = rx.recv() => {
                            let buf = match bincode::serialize(&packet) {
                                Ok(buf) => buf,
                                Err(_) => break,
                            };
    
                            match stream.write_u64(buf.len() as u64).await {
                                Ok(_) => {
                                    if stream.write_all(&buf).await.is_err() {
                                        break;
                                    }
                                },
                                Err(_) => break,
                            }
                        }
                    }
                }
            }
        }
    });

    loop {
        let msg = Message {
            username: "bob".to_owned(),
            message: "Hello, World!".to_owned(),
        };

        tx.send(Packet::Message(msg)).await?;
    }
}

#[derive(Debug, Serialize, Deserialize)]
enum Packet {
    Message(Message)
}

#[derive(Debug, Serialize, Deserialize)]
struct Message {
    username: String,
    message: String,
}

Do you still need to communicate with the actor while its dead?

By the sound of it your not tracking (getting feedback) whether sent data is successfully received or not. If so then I don't see why you need to care about the channel being successful, in which case just try_send and ignore full errors.

Hello,

While this approach would solve my previous issue, it will introduce another one that I forgot to mention in my initial question.

I would like to avoid sending outdated packets, to achieve this I would have to clear the buffer before allowing new messages.

try_send will succeed as soon as there's enough space in my channels buffer, meaning that clearing my buffer could also result in clearing new messages.

Thank you very much.

Kind regards,

nurturee

Hello,

If by Actor, you mean the task that is responsible for sending the messages received from the mpsc channel into the stream, then yea, there's no need to communicate with the actor while there's no connection.

Thank you very much.

Kind regards,

nurturee

How about having the task exit when the connection closes? Then the mpsc channel closes, and sends will tell you that the connection has closed. The next connection can make a new mpsc channel.

Hello,

While this approach could work, wouldn't it require me to handle reconnects in my client code by opening a new task on each retry?

Couldn't this be solved in a simpler way by wrapping the sender in a mutex and option and simply changing the option to none from the task that handles the receiver if there's no connection?

Kind regards,

nurturee