Correct/idiomatic design for threads and channels

Hello,

As I've just finished the Book I figured I'd give myself a small project to work on. After a week nights of feeling at loss I'd like some input on the design and how I can appease the borrow checker :slight_smile:

What I'd like to create is a Session struct which allows me to read and write data using a TcpStream, whilst also being able to receive "control commands" from the caller, main() in this case.
I decided to use a Session which owns the Ipc struct for communication between the multiple threads.

I figured I'd need threads to handle the reading off the network and handle the Command's that could come in. However with the code below I run into a problem as demonstrated by the output:

Created a new session to: 127.0.0.1
Estableshing connection...
Sending a short message right onto the socket: [1, 2, 3]
And received: 222
Calling run()
cmd_r: Ready to receive.
msg_s: sending: 227
msg_s: sending: 183
msg_s: sending: 222
msg_s: sending: 88

The call to run() works, but it never gets past that point in main() where it would spawn the threads to handle reading from the channels and it just hangs.

So I figured I'd move calling session.run() into a thread as well, but this is where I'm currently stuck:

error[E0502]: cannot borrow `session` as immutable because it is also borrowed as mutable
   --> src/main.rs:136:21
    |
132 |             s.spawn(|_| {
    |                     --- mutable borrow occurs here
133 |                 session.run();
    |                 ------- first borrow occurs due to use of `session` in closure
...
136 |             s.spawn(|_| {
    |               ----- ^^^ immutable borrow occurs here
    |               |
    |               mutable borrow later used by call
137 |                 loop {
138 |                     println!("msg_r: read: {:?}", session.ipc.msg_r.recv().unwrap());
    |                                                   ------- second borrow occurs due to use of `session` in closure

with this code in the thread::scope:

    	s.spawn(|_| {
	        session.run();
    	});

I think I can see why this is a failure because of mutability of self in run(), right? But I could use some help on whether this is the way to go or otherwise how I should redesign this in an idiomatic and working fashion.

Thank you.

use rand::prelude::*;
use std::io::prelude::*;
use std::net::TcpStream;
use std::time::Duration;

use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_utils::thread;

#[derive(Debug, PartialEq)]
pub enum Command {
    Hello,
    Bye,
}

#[derive(Debug)]
pub struct Ipc {
    // Send/receive messages obtained from a socket.
    msg_s: Sender<Box<u8>>,
    pub msg_r: Receiver<Box<u8>>,

    // Sends/receive a Command.
    pub cmd_s: Sender<Command>,
    cmd_r: Receiver<Command>,
}

impl Ipc {
    pub fn new() -> Ipc {
        let (msg_s, msg_r) = unbounded();
        let (cmd_s, cmd_r) = unbounded();
        Ipc { msg_s, msg_r, cmd_s, cmd_r }
    }
}

#[derive(Debug)]
pub struct Session {
    pub hostname: String,
    pub sock: Option<TcpStream>,
    pub ipc: Ipc,
}

impl Session {
    pub fn new(hostname: &str) -> Session {
        Session {
            hostname: hostname.to_string(),
            sock: None,
            ipc: Ipc::new(),
        }
    }

    pub fn connect(&mut self) -> std::io::Result<()> {
        // Just a netcat listener works fine.
        match TcpStream::connect(format!("{}:44444", self.hostname)) {
            Ok(t) => self.sock = Some(t),
            Err(_) => panic!("Could not connect to localhost"),
        }

        Ok(())
    }

    // Send a message across the connection
    pub fn write(&mut self, message: Box<[u8]>) -> std::io::Result<()> {
        self.sock.as_ref().unwrap().write(&message)?;

        Ok(())
    }

    // Read something from the connection (stub function)
    pub fn read(&mut self) -> u8 {
        std::thread::sleep(Duration::from_secs(1));
        random()
    }

    // Run forever (or until Command::Bye arrives).
    //
    // It reads data from the network and sends that back to the caller of this function across a channel.
    // Secondly it will read from another channel waiting for any commands to act on.
    pub fn run(&mut self) {
        let cmd_r2 = self.ipc.cmd_r.clone();
        thread::scope(|s| {
            s.spawn(|_| {
                loop {
                    let msg = Box::new(self.read());
                    println!("msg_s: sending: {:?}", msg);
                    self.ipc.msg_s.send(msg).unwrap();
                }
            });

            s.spawn(move |_| {
                println!("cmd_r: Ready to receive.");
                loop {
                    match cmd_r2.try_recv() {
                        Ok(c) if Command::Hello == c => {
                            println!("cmd_r: Well hello to you too!");
                        },
                        Ok(c) if Command::Bye == c => {
                            println!("cmd_r: Bye!");
                            break;
                        },
                        Ok(c) => println!("Unknown command: {:?}", c),
                        Err(_) => {},
                    }
                }
            });
        }).unwrap();

        println!("run: Bye");
    }
}

fn main() {
    let mut session = Session::new("127.0.0.1");
    println!("Created a new session to: {}", session.hostname);

    println!("Estableshing connection...");
    session.connect().unwrap();

    println!("Calling run()");
    session.run();

    thread::scope(|s| {
        s.spawn(|_| {
            loop {
                println!("msg_r: read: {:?}", session.ipc.msg_r.recv().unwrap());
            }    
        });

        s.spawn(|_| {
            println!("reaper: After 5 seconds I'll quit.");
            std::thread::sleep(Duration::from_secs(5));
            session.ipc.cmd_s.send(Command::Bye).unwrap();
        });
    }).unwrap();

    println!("main: Bye");
}

Not put a massive amount of thought into reply. Maybe you should look at using two structures along the lines of Client and Server.

Thanks Jonh for that suggestion, that makes sense. I've now split the code into a clear client and server separation. The code is up at https://gist.github.com/jasperla/ea594a27fe83df88c486fb44b7a352db

However it currently fails to compile at this point:

error[E0596]: cannot borrow `*self` as mutable, as it is behind a `&` reference
  --> src/main.rs:60:25
   |
60 |                         self.send(Box::new([0x1, 0x2, 0x3])).unwrap();
   |                         ^^^^ cannot borrow as mutable

If I remove this part the the communication between the threads via channels works fine.

Since I have to write to the TcpStream, self must be mutable..

What's wrong with making the run method take &mut self? Also, please be aware that the write method does not guarantee that it writes the full buffer — it will write some of it and return how many bytes it wrote. You probably want write_all.

Also, why do you use a Box<[u8]>? That's a rather odd choice considering you're just writing the data. Just use a slice &[u8].

Thank you alice! There is nothing wrong with it, I simply hadn't though of that anymore. Now my example works as intended :smiley:

As for your other points, thank you for them. I appreciate your review!

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.