Help with crossbeam and TcpStream

I am writing a application that will receive tcp packets, do some processing, broadcast processed packets, and save off processed packets. I have taken the crossbeam example and modified it to try to create a sample application for what I want to do.

extern crate crossbeam;

use crossbeam::channel::bounded;
use std::net::TcpStream;
use std::io::Write;
use std::io::Read;

fn main() -> Result<(), std::io::Error> {
    let (snd1, rcv1) = bounded(10);
    //let (snd2, rcv2)<> = bounded(10);
    let n_msgs = 100;
    let mut stream = TcpStream::connect("127.0.0.1:10002").unwrap();
    let mut stream1 = stream.try_clone().unwrap();
    let mut buffer = [0; 1024];


    crossbeam::thread::scope(move |s| {
        // Test Sender thread
        s.spawn(|_| {
            
            for i in 0..n_msgs {
                stream.write(&[i; 10]);
                //snd1.send(i).unwrap();
                println!("TCP Sender sent {}", i);
            }
        });


         // Tcp Receiver, crossbeam sender thread
         s.spawn(|_| {
                let n_bytes = stream1.read(&mut buffer[..]).unwrap();
                snd1.send(&buffer[0..n_bytes]).unwrap();
                println!("TCP Receiver got {:?}", &buffer[0..n_bytes]);
            // Close the channel - this is necessary to exit
            // the for-loop in the worker
            drop(snd1);
        });

         // crossbeam Receiver thread
         s.spawn(|_| {    
            let msg = rcv1.recv().unwrap();
            println!("Received {:?}", msg);
         });

    }).unwrap();
    Ok(())
}

But I get this error message

error[E0597]: `buffer` does not live long enough
  --> src\main.rs:32:28
   |
9  |     let (snd1, rcv1) = bounded(10);
   |          ---- lifetime `'1` appears in the type of `snd1`
...
32 |                 snd1.send(&buffer[0..n_bytes]).unwrap();
   |                 -----------^^^^^^-------------
   |                 |          |
   |                 |          borrowed value does not live long enough
   |                 argument requires that `buffer` is borrowed for `'1`
...
37 |         });
   |         - `buffer` dropped here while still borrowed

error: aborting due to previous error

For more information about this error, try `rustc --explain E0597`.
error: could not compile `test_threaded_tcp`.

Ultimately, I won't have a tcpSender in the real application, someone else sends my app data. But this is for testing. What I need to do is receive tcp_data and then use crossbeam channels to send the received data to another processing thread.

Removing the move from the scope

- crossbeam::thread::scope(move |s| {
+ crossbeam::thread::scope(|s| {

changes the error to

error[E0597]: `buffer` does not live long enough
  --> src/main.rs:28:45
   |
16 |     crossbeam::thread::scope(|s| {
   |                              --- value captured here
...
28 |             let n_bytes = stream1.read(&mut buffer).unwrap();
   |                                             ^^^^^^ borrowed value does not live long enough
...
44 | }
   | -
   | |
   | `buffer` dropped here while still borrowed
   | borrow might be used here, when `rcv1` is dropped and runs the `Drop` code for type `crossbeam::crossbeam_channel::Receiver`
   |
   = note: values in a scope are dropped in the opposite order they are defined

After that, defining buffer first (so that the receiver is dropped first) makes the code compile.

fn main() -> Result<(), std::io::Error> {
    let mut buffer = [0; 1024];
    let (snd1, rcv1) = bounded(10);
    // ...

As your code was, the buffer was moved into the scope which then made it so there was no guarantee it wouldn't be dropped before the threads finished. spawn's docs say

this thread is scoped, meaning it's guaranteed to terminate before the scope exits, allowing it to reference variables outside the scope

but because of the move, buffer was no longer outside the scope.

2 Likes

That was it, I could have swore I tried that. Guess not though, it works! Thanks.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.