Lost message with crossbeam channel

I am writing a replay system for my simulator of an asynchronous distributed system, and I'm having a problem with an unbounded crossbeam channel.

During a run, each node creates a file of trace records. If I detect a bug, I replay the received messages from that file. It's mostly working, but there is one place where the replayed messages sometimes get lost. Below is a simplified version of the send loop. The actual code sends messages to other receivers; they never fail.

Here trace_lines is a Lines<BufReader<File>> iterator. At this point I've created the channel and got the transmit and receive parts to the right places. The thread::sleep makes sure the thread doesn't exit before the messages are delivered.

thread::spawn(move || -> Result<(), Error> {
    match trace_lines.next() {
        None => break,
        Some(msg) => {
            println!{"Sending msg");
            match tx.send(msg) {
                Ok(_) => println!("Message sent"),
                Err(e) => println!("Error sending message {}", e)
            }
        }
    }
    std::thread::sleep(std::time::Duration::from_secs(1000000);
    println!("Thread exit");
    Ok(())
}

Here's a simplified version of the receiving side.

thread::spawn(move || -> Result<(),Error> {
    loop {
        println!("Waiting for message");
        let msg = rx.recv().expect("Read error");
        println!("Got message");
    }
}

Everything works about half the time. The other half of the time, my console shows

Waiting for message
Sending message
Message sent
Sending message
Message sent
Sending message
Message sent

I never get an error on either the sending or receiving side. Any idea what I could be doing wrong?

There's nothing obviously wrong here, but it is quite clear that you did not post the real code considering you somehow ended up with a sleep inside a match next to one of the arms. Perhaps the issue will be clear if you post the actual code with the issue?

Oops. Fixed in the original post.

Since the code is part of a product we're developing, I don't know if I'm allowed to post the actual code. Here's a more complete example.

thread::spawn(move || -> Result<(), Error> {
    loop {
        match trace_lines.next() {
            None => break,
            Some(record) => {
                 let trace_format = process_trace_record(record)?;
                 match trace_format {
                     TraceFormat::FormatA(msg) => {
                         txa.send(msg)?;
                      },
                      TraceFormat::FormatB(port, msg) => {
                          println!("Sending message");
                          let txb = self.get_tx(&port).expect("must be set");
                          match txb.send(msg) {
                               Ok(_) => println!("Message sent"),
                               Err(e) => println!("Message error {}", e)
                          } 
                      }
                 }
            }
        }
    }
    std::thread::sleep(std::time::Duration::from_secs(100000000));
    println!("Noc thread exit");
    Ok(())
});

I get the same value for port on runs that succeed and those that fail.

The receive side is pretty much what's in the original example. Only the names have been changed to keep me from getting yelled at.

I've clearly got some sort of message order race, because it works on some runs and not others. I just can't see what it could be. The printlns seem to show that things are happening in the right order, e.g., waiting for the message before it is sent.

Did you double-check that this sender from get_tx is actually the one that belongs to the receiver that you’re listening to on the receiving end?

Good thought, @steffahn. That channel is set up in single threaded code before the replay code runs, so it should be OK. Still, I'd like to check, but I don't know how. Printing with {:?} gives Sender { .. }.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.