Rust send serialized struct from crossbeam channel to multiple receivers via tcp

I am trying to send a serialized struct over tcp to multiple machines, the struct being derived in each timestep of a continuous simulation.
The tcp handler receives the serialized struct (String type) by a crossbeam channel from another thread. My problem is that the rx.try_iter() will drain the crossbeam channel, and if more than one client is connected the clients can't receive the same struct. I tried moving the rx.try_iter() out of the individual handle_client() function, but couldn't achieve a good result. Thanks for your time and help!

This is what I have so far:

(Server side)

use std::net::{TcpListener, TcpStream};
use std::thread;
use std::io::{Read,Write,Error};
use serde::{Serialize, Deserialize};
use crossbeam_channel::unbounded;

#[derive(Serialize, Deserialize)]
pub struct Serialized {
    pub buffer: Vec<u32>,
    pub timestep: u128,
}

impl Serialized {
    pub fn serialize(buffer: Vec<u32>, timestep: u128) -> String {
        let x = Serialized {
           buffer,
           timestep 
        };
        serde_json::to_string(&x).unwrap()
    }
}

fn handle_client(mut stream: TcpStream, rx: crossbeam_channel::Receiver<String>)-> Result<(), Error> {
    println!("incoming connection from: {}", stream.peer_addr()?);
    loop {
        //receive from channel
        let serialized = match rx.try_iter().last(){
            Some(x) => x,
            None => continue,
        };

        //write to stream
        stream.write(serialized.as_bytes())?;
    }
}

pub fn start_server(rx: crossbeam_channel::Receiver<String>) {
    
    let listener = TcpListener::bind("localhost:8888").expect("Could not bind");
    
    for stream in listener.incoming() {
        let rx = rx.clone();
        match stream {
            Err(e)=> {eprintln!("failed: {}", e)}
            Ok(stream) => {
                
                thread::spawn(move || {
                    handle_client(stream, rx).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
                
            }
        } 

    }
}

fn main() {

    let (tx, rx) = crossbeam_channel::unbounded::<i32>();

    thread::spawn(move || {
        start_server(rx);
    });

    let mut i = 0;
    loop {
        //simulation runs//
        std::thread::sleep_ms(100);
        i = i + 1;
        //

        let s =  Serialized {
           buffer: vec![1],
           timestep: i,
       };
        tx.send(s);
    }

}

(Client side)

use std::net::TcpStream;
use serde::{Serialize, Deserialize};
use std::error::Error as er;

#[derive(Serialize, Deserialize, Debug)]
pub struct Serialized {
    pub buffer: Vec<u32>,
    pub timestep: u128,
}

fn read_user_from_stream(tcp_stream: &mut TcpStream) -> Result<Serialized, Box<dyn er>> {
    let mut de = serde_json::Deserializer::from_reader(tcp_stream);
    let u = Serialized::deserialize(&mut de)?;

    Ok(u)
}

pub fn start_client() {
    loop {
        let mut stream = TcpStream::connect("localhost:8888").expect("could not connect");
        let serialized = read_user_from_stream(&mut stream).unwrap();
        println!("timestep: {}", serialized.timestep);
    }
}

fn main() {
    start_client();
}

Edit:
After a helpful comment on stackoverflow, I thought about using this problem with broadcasting implementation of the bus create. However, here the channel is bounded. Moreover, I have to keep the Bus struct in the main thread, where the simulations are. But then, I can't move it to the server thread, where I need to spawn a new BusReader for every tcp connection.

Assuming you're referring to the bus crate in your edit you have a couple options

  1. Use a mutex for the bus (Arc<Mutex<Bus>>) so the simulation and listener threads can both access it mutably

  2. Create an extra thread that holds the Bus and send messages to that thread from both the listener and the simulation threads (via a crossbeam_channel or an std::sync::mpsc::channel). The message would be an enum with variants that either held the new connection or the message to send to all the clients.

Which one works better for you will depend on what performance characteristics you care about.

Great, thanks a lot for these solutions. I will try to implement the enum variant you suggested.

This works now as expected:

use std::sync::{Arc, Mutex};
use std::thread;
use std::net::{TcpListener, TcpStream};
use std::io::{Read,Write,Error};
use bus::{Bus, BusReader};

fn main() {
    let mut x: u32 = 0;
    let bus = Bus::<u32>::new(10);

    let bus_mutex = Arc::new(Mutex::new(bus));

    let bus_mutex_cp = Arc::clone(&bus_mutex);
    thread::spawn(move || {
        start_server(bus_mutex_cp);
    });

    //simulation loop
    for _ in 0..99999 {
        x = x + 1;
        println!("Simulation step: {}", x);
        thread::sleep_ms(1000);
        bus_mutex.lock().unwrap().broadcast(x);
    }

    loop {}

}


pub fn start_server(bus_mutex: Arc<Mutex<Bus<u32>>>) {
    
    let listener = TcpListener::bind("0.0.0.0:8888").expect("Could not bind");
    
    for stream in listener.incoming() {
        
        match stream {
            Err(e)=> {eprintln!("failed: {}", e)}
            Ok(stream) => {
                
                let rx = bus_mutex.lock().unwrap().add_rx();
                thread::spawn(move || {
                    handle_client(stream, rx).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
                
            }
        } 

    }
}

fn handle_client(mut stream: TcpStream, mut rx: BusReader<u32>)-> Result<(), Error> {
    println!("incoming connection from: {}", stream.peer_addr()?);
    loop {
        //receive from bus
        let x = rx.recv().unwrap();

        //write to stream
        stream.write(&x.to_string().as_bytes())?;

        thread::sleep_ms(100);
    }
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.