I am writing a replay system for my simulator of an asynchronous distributed system, and I'm having a problem with an unbounded crossbeam channel.
During a run, each node creates a file of trace records. If I detect a bug, I replay the received messages from that file. It's mostly working, but there is one place where the replayed messages sometimes get lost. Below is a simplified version of the send loop. The actual code sends messages to other receivers; they never fail.
Here trace_lines
is a Lines<BufReader<File>>
iterator. At this point I've created the channel and got the transmit and receive parts to the right places. The thread::sleep
makes sure the thread doesn't exit before the messages are delivered.
thread::spawn(move || -> Result<(), Error> {
match trace_lines.next() {
None => break,
Some(msg) => {
println!{"Sending msg");
match tx.send(msg) {
Ok(_) => println!("Message sent"),
Err(e) => println!("Error sending message {}", e)
}
}
}
std::thread::sleep(std::time::Duration::from_secs(1000000);
println!("Thread exit");
Ok(())
}
Here's a simplified version of the receiving side.
thread::spawn(move || -> Result<(),Error> {
loop {
println!("Waiting for message");
let msg = rx.recv().expect("Read error");
println!("Got message");
}
}
Everything works about half the time. The other half of the time, my console shows
Waiting for message
Sending message
Message sent
Sending message
Message sent
Sending message
Message sent
I never get an error on either the sending or receiving side. Any idea what I could be doing wrong?