Concurrently reading/parsing pcap file

I'm still relatively new to rust and have been working on building a function to read packets from a pcap file and parse them concurrently. What I would like to do is read a packet from the file and spawn a thread to parse the the packet. I have included the preliminary code below. I get the following error:
error[E0499]: cannot borrow pcap_reader as mutable more than once at a time

I assume this is down to how I am structuring reading from the pcap, would someone be able to point me in the right direction?

    let mut handles = Vec::new();
    let file_in = File::open("pcap_file.pcap").expect("Error opening file");
    let mut pcap_reader = PcapReader::new(file_in).unwrap();


    while let Some(pkt) = pcap_reader.next_packet() {
        let processed_pkt = Arc::clone(&processed_pkt);
        let handle = thread::spawn(move|| {
            let pkt = pkt.unwrap(); // Adding parsing here 
            let mut processed_pkt = processed_pkt.lock().unwrap();
            processed_pkt.push(pkt)
        });

        handles.push(handle);

    }

    for handle in handles {
        handle.join().unwrap()
    }
    Arc::try_unwrap(processed_pkt).unwrap().into_inner().unwrap();
    }

Assuming PcapReader is from the pcap_file crate, the packet is borrowing data from the reader. You can call into_owned to get a packet that has copied the data it needs out of the reader so it can be sent to another thread safely.

I tried that and it fixed both the error and another issue I was having, thank you for your help! Now on to verifying if the code is doing what I think it's doing ...

I restructured the code so it now looks like this:

    let file_in = File::open("pcap_file.pcap").expect("Error opening file");
    let mut pcap_reader = PcapReader::new(file_in).unwrap();

    while let Some(pkt) = pcap_reader.next_packet() {
        let pkt = pkt.unwrap().into_owned();
        let processed_pkt = Arc::clone(&processed_pkt);
        let handle = thread::spawn(move|| {
            let pkt_data = pkt.data; // Adding parsing here 
            let mut processed_pkt = processed_pkt.lock().unwrap();
            processed_pkt.push(pkt_data)
        });

        handles.push(handle);

    }

    for handle in handles {
        handle.join().unwrap()
    }
    Arc::try_unwrap(processed_pkt).unwrap().into_inner().unwrap();
    }

The overhead of spawning threads is such that you should probably spawn 1 thread at the start, a parsing thread, and then send packets to it from your packet listening thread through a channel. And keep the vec of parsed results in your parsing thread so that you can return it at the end and avoid the whole arc mutex dance.

// Pseudo code
let parsed_packets: Vex<_> = thread::scope(|s| {
    let (sender, receiver) = channel(); // you should use a bounded channel here
    s.spawn(move || {
        while let Some(packet) = pcap_reader.next_packet() {
            sender.send(packet).unwrap();
        }
    });
    receiver.into_iter().map(parse_packet).collect()
});
3 Likes

I really appreciate the feedback, thank you!

I would like to parse packets with a number of concurrent threads if possible, but what I'm understanding is that it would be simpler to have this as a single-threaded program?

I'm not convinced it wouldn't be simpler and faster to to it in a single thread. Benchmark your code.

There are 2 threads in my example, the packet receiving and the parsing thread. This would be simpler and very probably perform better than spawning an unbounded number of threads. Remember that threads are expensive to spawn, especially compared to small in-memory tasks such as parsing.

1 Like

Thanks again for the advice, it's much appreciated!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.