Using self with tokio for implementing a simple chat server

Hello everyone!
I'm new to Rust and very new to async Rust. In order to improve my understanding of async Rust, I have been trying to write to write a simple chat server using Tokio. Right now, the code that I have spins one task per client as well as a broadcaster task. The broadcaster task broadcast messages from one client to all others. In order to better structure the code, I've tried to implement it using structs. However, I have run into an issue where the Rust compiler does not seem to compile the code. The error I see is:

~/simpletokiochat$ cargo build
   Compiling simpletokiochat v0.1.0 (/home/runner/simpletokiochat)
error: cannot infer an appropriate lifetime
  --> src/main.rs:52:24
   |
52 |     pub async fn start(&self, port_num: u16) -> Result<(), &'static str> {
   |                        ^^^^^ ...but this borrow...
...
57 |         tokio::spawn(async move {
   |         ------------ this return type evaluates to the `'static` lifetime...
   |
note: ...can't outlive the lifetime `'_` as defined on the method body at 52:24
  --> src/main.rs:52:24
   |
52 |     pub async fn start(&self, port_num: u16) -> Result<(), &'static str> {
   |                        ^

error: aborting due to previous error

error: could not compile `simpletokiochat`.

To learn more, run the command again with --verbose.

Here is the entire code:

use std::env;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    sync::mpsc::{UnboundedReceiver, UnboundedSender},
};

const DEFAULT_PORT: u16 = 12800;

#[tokio::main]
async fn main() {
    let port_num = get_port().unwrap_or_else(|_e| {
        println!(
            "Program started without a port number, starting with default port {}",
            DEFAULT_PORT
        );
        DEFAULT_PORT
    });
    let s = LocalServer::new();
    s.start(port_num).await.unwrap();
}

fn get_port() -> Result<u16, &'static str> {
    let args: Vec<_> = env::args().collect();
    if args.len() < 2 {
        return Err("Invalid number of arguments");
    }
    let port_number = &args[1];
    let parse_result = port_number.parse::<u16>();
    match parse_result {
        Ok(port) => Ok(port),
        Err(_err) => Err("Invalid argument"),
    }
}

pub struct LocalServer {
    streams: Mutex<Vec<Arc<TcpStream>>>,
}

impl LocalServer {
    pub fn new() -> Self {
        LocalServer {
            streams: Mutex::new(vec![]),
        }
    }
    //todo: start a mpsc channel and an associated broadcaster thread
    //todo: hold onto the tcp listener stat's clone inside a mutex in localserver - similar to the other one
    //todo: have this task message the broadcaster on every new message
    pub async fn start(&self, port_num: u16) -> Result<(), &'static str> {
        let bind_addr = format!("127.0.0.1:{}", port_num);
        let listener = TcpListener::bind(&bind_addr).await.unwrap();
        let (tx, rx): (UnboundedSender<String>, UnboundedReceiver<String>) =
            mpsc::unbounded_channel();
        tokio::spawn(async move {
            loop {
                let data = rx.recv().await.unwrap();
                for st in self.streams.lock().unwrap().iter() {
                    st.write(data.as_bytes());
                }
            }
        });
        loop {
            let (mut socket, _) = listener.accept().await.unwrap();
            let reader_writer = Arc::new(socket);
            let thread_socket = reader_writer.clone();
            let thread_sender = tx.clone();
            tokio::spawn(async move {
                let mut buffer = [0; 1024];
                {
                    let my_streams = self.streams.lock().unwrap();
                    my_streams.push(thread_socket.clone());
                }
                loop {
                    match thread_socket.read(&mut buffer).await {
                        Ok(0) => {
                            return;
                        }
                        Ok(n) => {
                            if thread_socket.write_all(&buffer[0..n]).await.is_err() {
                                return;
                            }
                            let s: String =
                                (std::str::from_utf8(&buffer[0..n]).unwrap()).to_owned();
                            thread_sender.send(s).unwrap();
                        }
                        Err(_) => {
                            return;
                        }
                    }
                }
            });
        }
    }
}

As this requires the tokio crate, I could not create a Playground link, but here is a repl.it link

I would really appreciate if someone can help me understand how to get around the problem.

tokio::spawn creates a task that can outlive the function that spawns it, so it can't use variables like &self that are only temporarily borrowed by that function. The simplest fix is the make the start function take self instead of &self:

pub async fn start(self, port_num: u16)
1 Like

I talk about this problem in detail in my blog post Actors with Tokio. Specifically, see the "A run method on a struct" heading.

As a general word of advice, putting an IO resource such as a TcpStream behind a Mutex is pretty much always wrong. The standard solution is to spawn a new task for every IO resource, such that the task can have exclusive ownership of that IO resource. If anything else needs to talk to the IO resource, it should do so by sending messages to the task, not by accessing the IO resource object directly.

3 Likes

The Tokio crate is available on the playground.

I see, thank you for clarifying. I came to the earlier conclusion because I saw this issue open here

Well, it has changed since 2017.