so should I use 'clone', Is it right?
Isn't the clone slow?
I'm just wondering.
I have full source.
use async_std::{
io::BufReader,
net::{TcpListener, TcpStream, ToSocketAddrs},
prelude::*,
task,
};
use futures::channel::mpsc;
use futures::sink::SinkExt;
use futures::{select, FutureExt};
use std::{
collections::hash_map::{Entry, HashMap},
future::Future,
sync::Arc,
};
use uuid::Uuid;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;
#[derive(Debug)]
enum Event {
NewPeer {
//name: String,
uuid :Uuid,
stream: Arc<TcpStream>,
shutdown: Receiver<Void>,
},
SecretMessage {
from: Uuid,
to: Vec<String>,
msg: String,
},
Message{
msg: String,
}
}
#[derive(Debug)]
enum Void {}
fn run() -> Result<()> {
task::block_on(accept_loop("127.0.0.1:7000"))
}
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let (broker_sender, broker_receiver) = mpsc::unbounded();
//
let broker_handle = task::spawn(broker_loop(broker_receiver));
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let socket_stream = stream?;
println!("Accepting from: {}", socket_stream.peer_addr()?);
spawn_and_log_error(connection_loop(broker_sender.clone(), socket_stream));
}
drop(broker_sender);
broker_handle.await;
Ok(())
}
async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
let stream = Arc::new(stream);
let reader = BufReader::new(&*stream);
let mut lines = reader.lines(); // read text line
println!("uuid : {}", uuid);
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();
broker.send(Event::NewPeer {
uuid: uuid.clone(),
stream: Arc::clone(&stream),
shutdown: shutdown_receiver,
}).await.unwrap();
while let Some(line) = lines.next().await {
let line = line?;
let msg: String = line.trim().to_string();
broker.send(Event::Message { msg: msg }).await.unwrap();
}
Ok(())
}
async fn connection_writer_loop(messages: &mut Receiver<String>, stream: Arc<TcpStream>, shutdown: Receiver<Void>) -> Result<()> {
let mut stream = &*stream;
let mut messages = messages.fuse();
let mut shutdown = shutdown.fuse();
loop {
select! {
msg = messages.next().fuse() => match msg {
Some(msg) => stream.write_all(msg.as_bytes()).await?,
None => break,
},
void = shutdown.next().fuse() => match void {
Some(void) => match void {},
None => break,
}
}
}
Ok(())
}
async fn broker_loop(events: Receiver<Event>) {
let (disconnect_sender, mut disconnect_receiver) = mpsc::unbounded::<(Uuid, Receiver<String>)>(); // 1
let mut peers: HashMap<Uuid, Sender<String>> = HashMap::new();
let mut events = events.fuse();
loop {
let event = select! {
event = events.next().fuse() => match event {
None => break, // 2
Some(event) => event,
},
disconnect = disconnect_receiver.next().fuse() => {
let (uuid, _pending_messages) = disconnect.unwrap(); // 3
println!("Disconnected : {}", uuid);
assert!(peers.remove(&uuid).is_some());
continue;
},
};
match event {
Event::Message { msg } =>{
for val in &peers{
let (key, mut peer) = val;
peer.send(msg.clone()).await.unwrap();
}
}
Event::SecretMessage { from, to, msg } => {
for addr in to {
let uuid = Uuid::parse_str(&addr).unwrap();
if let Some(peer) = peers.get_mut(&uuid) {
let msg = format!("from {}: {}\n", from, msg);
peer.send(msg).await.unwrap() // 6
}
}
}
Event::NewPeer { uuid, stream, shutdown } => {
match peers.entry(uuid.clone()) {
Entry::Occupied(..) => (),
Entry::Vacant(entry) => {
let (client_sender, mut client_receiver) = mpsc::unbounded();
entry.insert(client_sender);
let mut disconnect_sender = disconnect_sender.clone();
spawn_and_log_error(async move {
let res = connection_writer_loop(&mut client_receiver, stream, shutdown).await;
disconnect_sender.send((uuid, client_receiver)).await // 4
.unwrap();
res
});
}
}
}
}
}
drop(peers); // 5
drop(disconnect_sender); // 6
while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {
}
}
// spawn or error
// create task
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
where
F: Future<Output = Result<()>> + Send + 'static,
{
task::spawn(async move {
if let Err(e) = fut.await {
println!("{}", e)
}
})
}
fn main(){
println!("---> server on 7000");
run();
}