Hello,
As I've just finished the Book I figured I'd give myself a small project to work on. After a week nights of feeling at loss I'd like some input on the design and how I can appease the borrow checker
What I'd like to create is a Session
struct which allows me to read and write data using a TcpStream
, whilst also being able to receive "control commands" from the caller, main()
in this case.
I decided to use a Session
which owns the Ipc struct for communication between the multiple threads.
I figured I'd need threads to handle the reading off the network and handle the Command
's that could come in. However with the code below I run into a problem as demonstrated by the output:
Created a new session to: 127.0.0.1
Estableshing connection...
Sending a short message right onto the socket: [1, 2, 3]
And received: 222
Calling run()
cmd_r: Ready to receive.
msg_s: sending: 227
msg_s: sending: 183
msg_s: sending: 222
msg_s: sending: 88
The call to run()
works, but it never gets past that point in main()
where it would spawn the threads to handle reading from the channels and it just hangs.
So I figured I'd move calling session.run()
into a thread as well, but this is where I'm currently stuck:
error[E0502]: cannot borrow `session` as immutable because it is also borrowed as mutable
--> src/main.rs:136:21
|
132 | s.spawn(|_| {
| --- mutable borrow occurs here
133 | session.run();
| ------- first borrow occurs due to use of `session` in closure
...
136 | s.spawn(|_| {
| ----- ^^^ immutable borrow occurs here
| |
| mutable borrow later used by call
137 | loop {
138 | println!("msg_r: read: {:?}", session.ipc.msg_r.recv().unwrap());
| ------- second borrow occurs due to use of `session` in closure
with this code in the thread::scope:
s.spawn(|_| {
session.run();
});
I think I can see why this is a failure because of mutability of self
in run()
, right? But I could use some help on whether this is the way to go or otherwise how I should redesign this in an idiomatic and working fashion.
Thank you.
use rand::prelude::*;
use std::io::prelude::*;
use std::net::TcpStream;
use std::time::Duration;
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_utils::thread;
#[derive(Debug, PartialEq)]
pub enum Command {
Hello,
Bye,
}
#[derive(Debug)]
pub struct Ipc {
// Send/receive messages obtained from a socket.
msg_s: Sender<Box<u8>>,
pub msg_r: Receiver<Box<u8>>,
// Sends/receive a Command.
pub cmd_s: Sender<Command>,
cmd_r: Receiver<Command>,
}
impl Ipc {
pub fn new() -> Ipc {
let (msg_s, msg_r) = unbounded();
let (cmd_s, cmd_r) = unbounded();
Ipc { msg_s, msg_r, cmd_s, cmd_r }
}
}
#[derive(Debug)]
pub struct Session {
pub hostname: String,
pub sock: Option<TcpStream>,
pub ipc: Ipc,
}
impl Session {
pub fn new(hostname: &str) -> Session {
Session {
hostname: hostname.to_string(),
sock: None,
ipc: Ipc::new(),
}
}
pub fn connect(&mut self) -> std::io::Result<()> {
// Just a netcat listener works fine.
match TcpStream::connect(format!("{}:44444", self.hostname)) {
Ok(t) => self.sock = Some(t),
Err(_) => panic!("Could not connect to localhost"),
}
Ok(())
}
// Send a message across the connection
pub fn write(&mut self, message: Box<[u8]>) -> std::io::Result<()> {
self.sock.as_ref().unwrap().write(&message)?;
Ok(())
}
// Read something from the connection (stub function)
pub fn read(&mut self) -> u8 {
std::thread::sleep(Duration::from_secs(1));
random()
}
// Run forever (or until Command::Bye arrives).
//
// It reads data from the network and sends that back to the caller of this function across a channel.
// Secondly it will read from another channel waiting for any commands to act on.
pub fn run(&mut self) {
let cmd_r2 = self.ipc.cmd_r.clone();
thread::scope(|s| {
s.spawn(|_| {
loop {
let msg = Box::new(self.read());
println!("msg_s: sending: {:?}", msg);
self.ipc.msg_s.send(msg).unwrap();
}
});
s.spawn(move |_| {
println!("cmd_r: Ready to receive.");
loop {
match cmd_r2.try_recv() {
Ok(c) if Command::Hello == c => {
println!("cmd_r: Well hello to you too!");
},
Ok(c) if Command::Bye == c => {
println!("cmd_r: Bye!");
break;
},
Ok(c) => println!("Unknown command: {:?}", c),
Err(_) => {},
}
}
});
}).unwrap();
println!("run: Bye");
}
}
fn main() {
let mut session = Session::new("127.0.0.1");
println!("Created a new session to: {}", session.hostname);
println!("Estableshing connection...");
session.connect().unwrap();
println!("Calling run()");
session.run();
thread::scope(|s| {
s.spawn(|_| {
loop {
println!("msg_r: read: {:?}", session.ipc.msg_r.recv().unwrap());
}
});
s.spawn(|_| {
println!("reaper: After 5 seconds I'll quit.");
std::thread::sleep(Duration::from_secs(5));
session.ipc.cmd_s.send(Command::Bye).unwrap();
});
}).unwrap();
println!("main: Bye");
}