Async std simple chat how to works without clone?

Sample is here.
I change code.
https://book.async.rs/tutorial/all_together.html

I wonder. Does it work without a clone in loop? What's the difference?

  match event {
            Event::Message { msg } =>{    
                for val in &peers{
                    let (key, mut peer) = val;
                    //==============================
                    // This is error
                    //peer.send(msg).await.unwrap(); 
                    //==============================
                    peer.send(msg.clone()).await.unwrap(); // This is OK.
                }

            }
            Event::SecretMessage { from, to, msg } => {
                for addr in to {
                    let uuid = Uuid::parse_str(&addr).unwrap();
                    if let Some(peer) = peers.get_mut(&uuid) {

                        let msg = format!("from {}: {}\n", from, msg);
                        //==============================
                        // It works without clone even in the loop.
                        //  I don't know how it works without clone.
                        //==============================
                        peer.send(msg).await.unwrap() // 6 

                    }
                }
            }

You're creating a new message in the SecretMessage arm, so you don't need to clone it again.

Specifically, this line

let msg = format!("from {}: {}\n", from, msg);

creates a new string which has a copy of the original message in it. Since that line runs on every loop iteration, you don't need to clone the message. You can just move out of the variable and the next loop iteration will fill it in before it uses it again.

Thsnks reply.
I get it now.

so should I use 'clone', Is it right?

Isn't the clone slow?
I'm just wondering.

I have full source.

use async_std::{
    io::BufReader,
    net::{TcpListener, TcpStream, ToSocketAddrs},
    prelude::*,
    task,
};
use futures::channel::mpsc;
use futures::sink::SinkExt;
use futures::{select, FutureExt};
use std::{
    collections::hash_map::{Entry, HashMap},
    future::Future,
    sync::Arc,
};

use uuid::Uuid;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;

#[derive(Debug)]
enum Event {
    NewPeer {
        //name: String,
        uuid :Uuid,
        stream: Arc<TcpStream>,
        shutdown: Receiver<Void>,
    },
    SecretMessage {
        from: Uuid,
        to: Vec<String>,
        msg: String,
    },
    Message{
        msg: String,
    }
}

#[derive(Debug)]
enum Void {}

fn run() -> Result<()> {
    task::block_on(accept_loop("127.0.0.1:7000"))
}

async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
    let listener = TcpListener::bind(addr).await?;
    let (broker_sender, broker_receiver) = mpsc::unbounded();

    // 
    let broker_handle = task::spawn(broker_loop(broker_receiver));
    let mut incoming = listener.incoming();

 
    while let Some(stream) = incoming.next().await {
        let socket_stream = stream?;
        println!("Accepting from: {}", socket_stream.peer_addr()?);
 
        spawn_and_log_error(connection_loop(broker_sender.clone(), socket_stream));
    }

    drop(broker_sender);
    broker_handle.await;
    Ok(())
}

async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
    let stream = Arc::new(stream);
    let reader = BufReader::new(&*stream);
    let mut lines = reader.lines(); // read text line

    println!("uuid : {}", uuid);

    let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();


    broker.send(Event::NewPeer {
        uuid: uuid.clone(),
        stream: Arc::clone(&stream),
        shutdown: shutdown_receiver,
    }).await.unwrap();

    while let Some(line) = lines.next().await {
        let line = line?;
        let msg: String = line.trim().to_string();
        broker.send(Event::Message { msg: msg }).await.unwrap();

    }

    Ok(())
}

async fn connection_writer_loop(messages: &mut Receiver<String>, stream: Arc<TcpStream>, shutdown: Receiver<Void>) -> Result<()> {
    let mut stream = &*stream;
    let mut messages = messages.fuse();
    let mut shutdown = shutdown.fuse();
    loop {
        select! {
            msg = messages.next().fuse() => match msg {
                Some(msg) => stream.write_all(msg.as_bytes()).await?,
                None => break,
            },
            void = shutdown.next().fuse() => match void {
                Some(void) => match void {},
                None => break,
            }
        }
    }
    Ok(())
}

async fn broker_loop(events: Receiver<Event>) {
    let (disconnect_sender, mut disconnect_receiver) = mpsc::unbounded::<(Uuid, Receiver<String>)>(); // 1
    let mut peers: HashMap<Uuid, Sender<String>> = HashMap::new();
    let mut events = events.fuse();
    loop {
        let event = select! {
            event = events.next().fuse() => match event {
                None => break, // 2
                Some(event) => event,
            },
            disconnect = disconnect_receiver.next().fuse() => {
                let (uuid, _pending_messages) = disconnect.unwrap(); // 3
                println!("Disconnected : {}", uuid);
                assert!(peers.remove(&uuid).is_some());
                continue;
            },
        };
        match event {
            Event::Message { msg } =>{
              
                for val in &peers{
                    let (key, mut peer) = val;
                    peer.send(msg.clone()).await.unwrap();
                   }
            }
            Event::SecretMessage { from, to, msg } => {
                for addr in to {
                    let uuid = Uuid::parse_str(&addr).unwrap();
                    if let Some(peer) = peers.get_mut(&uuid) {

                        let msg = format!("from {}: {}\n", from, msg);
                        peer.send(msg).await.unwrap() // 6

                    }
                }
            }
            Event::NewPeer { uuid, stream, shutdown } => {
                match peers.entry(uuid.clone()) {
                    Entry::Occupied(..) => (),
                    Entry::Vacant(entry) => {
                        let (client_sender, mut client_receiver) = mpsc::unbounded();
                        entry.insert(client_sender);
                        let mut disconnect_sender = disconnect_sender.clone();
                        spawn_and_log_error(async move {
                            let res = connection_writer_loop(&mut client_receiver, stream, shutdown).await;
                            disconnect_sender.send((uuid, client_receiver)).await // 4
                                .unwrap();
                            res
                        });
                    }
                }
            }
        }
    }
    drop(peers); // 5
    drop(disconnect_sender); // 6
    while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {
    }
}

// spawn or error 
// create task
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
where
    F: Future<Output = Result<()>> + Send + 'static,
{
    task::spawn(async move {
        if let Err(e) = fut.await {
            println!("{}", e)
        }
    })
}

fn main(){
    println!("--->  server on 7000");
    run();
}
    

Clone creates a full copy of the string for each peer. Probably not disastrously slow for a toy project, but certainly work that you don't really need to be doing.

Skimming your code, I think you could use an Arc<String> instead, then you would call clone on the Arc which is much cheaper than cloning the whole string[1]. You'll have to update the Sender and Receiver types for that to work though.


  1. it just increments a reference count ↩︎

Thank you for your kind answer.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.