Debugging while loop exiting

Hello all,

I've written a program that uses the pcap crate to capture packets, filters for a specific protocol, parses the data in the messages and logs any changes between messages for each unique flow. The program works with low traffic levels, however when I increase the amount of traffic to be parsed (i.e. by running it in the production environment), the program exits without any errors, and I'm struggling to figure out why.

The bulk of the program runs in this while loop:

    #141 -> while let Ok(packet) = cap.next_packet() {
                  ....
    }

I know that the while loop is exiting (I print to the console after the while loop), which I would have thought meant that cap.next_packet() is returning an error, however, I placed a match statement after the while loop to print the results of cap.next_packet(), and it consistently returns an OK, so I'm not sure why the while loop would exit?

The error does not happen at the same time--sometimes it's after a few hundred thousand packets, sometimes its after only a few packets--there doesn't seem to be any consistency.

Printing out cap.stats does not show any dropped packets. I disabled all the logging to console/file, and that had no effect.

Does anybody have any suggestions on how I could debug this? I'm still fairly new to this, so apologies if I have missed something very obvious. Thanks!

Full code below:

extern crate chrono;
extern crate pcap;
extern crate pnet;

use chrono::Local;
use pcap::{Capture, Device, Packet};

use std::collections::HashMap;
use std::convert::TryInto;
use std::env;
use std::fs::OpenOptions;
use std::hash::{Hash, Hasher};
use std::net::{IpAddr, Ipv4Addr};
use std::io::{stdin,stdout, Write};

#[macro_use]
extern crate slog;
extern crate slog_async;
extern crate slog_json;
extern crate slog_stream;
extern crate slog_term;
use slog::Drain;

#[derive(Debug)]
struct FlowId {
    src_ip: String,
    src_port: u16,
    dst_ip: String,
    dst_port: u16,
}
#[derive(Debug)]
struct FlowInfo {
    payload: Vec<u8>,
    packet_count: u32,
    capture_time_sec: i32,
    capture_time_usec: i32,
}
impl Hash for FlowId {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.src_ip.hash(state);
        self.src_port.hash(state);
        self.dst_ip.hash(state);
        self.dst_port.hash(state);
    }
}

//fn main() -> Result<(), std::io::Error> {
fn main() {
    dotenv::dotenv().ok();

    let args: Vec<String> = env::args().collect();

    // Print arguments
    for argument in args.iter() {
        println!("{}", argument);
    }
    // Initialize logging
    let drain = slog_async::Async::new(
        slog::Duplicate::new(
            slog::Filter::new(
                slog_term::FullFormat::new(
                    slog_term::PlainSyncDecorator::new(std::io::stderr(),)).build(),
                |record: &slog::Record| record.level().is_at_least(slog::Level::Warning),
            ),
            slog_term::FullFormat::new(slog_term::PlainSyncDecorator::new(std::io::stdout())).build(),
        ).fuse()
    ).chan_size(4096).build().fuse();
    
    let start_time = Local::now();
    let log_path = format!("udp_sniffer-{}.log", start_time.format("%Y-%m-%d-%H-%M-%S"));
    let file = OpenOptions::new()
       .create(true)
       .write(true)
       .truncate(true)
       .open(log_path)
       .unwrap();
 
     let file_decorator = slog_term::PlainDecorator::new(file);
     let file_drain = slog_term::FullFormat::new(file_decorator).build().fuse();
     let file_drain = slog_async::Async::new(file_drain).build().fuse();
 
     //let _log = slog::Logger::root(file_drain, o!());
 

    //let root_logger = slog::Logger::root(drain, o!());
    let root_logger = slog::Logger::root(slog::Duplicate::new(drain, file_drain).fuse(), o!());
    info!(root_logger, "";
        "Application started at" => format!("{}", chrono::Utc::now()));
    
    // Get a list of all available network devices
    let devices_list: Vec<Device> = Device::list().unwrap();
    /*
    // Prompt the user to select a network device, only display interfaces with IPv4 addresses
    println!("Available network interfaces:");
    for (i, device) in devices_list.iter().enumerate() {
        
        for address in &device.addresses {
            if address.addr.is_ipv4() {
                println!("{}: {} - {:?} - {}", i+1, device.name, device.desc.clone().unwrap(), address.addr);
            }
        }
    }
    
    print!("Enter the number of the interface you want to use: ");
    stdout().flush()?;
    
    let mut device_selection = String::new();
    stdin().read_line(&mut device_selection)?;
    let device_selection = device_selection.trim();
    
    let selected_interface: usize = device_selection.parse().unwrap();
     */
    let selected_device: Device = devices_list[0].clone();
    let device_name = selected_device.desc.clone().unwrap();
    let mut cap = Capture::from_device(selected_device).unwrap()
    .snaplen(65535)
    .promisc(true)
    .immediate_mode(false)
    .buffer_size(1024 * 1024 * 1024)
    .open().unwrap();
    
    // Print the name of the interface
    //info!(root_logger, "Capturing packets from interface {:?}", device_name);
    // Set the filter to only capture packets that are 646 bytes long
    cap.filter("len == 646", true).unwrap();

    let mut payloads:HashMap<u64, FlowInfo> = HashMap::new();
    let mut buf_ptr:usize;
    let mut gpo_state: [u32; 4] = [0; 4];
    let mut last_gpo_state: [u32; 4] = [0; 4];
    let mut level_state: [i16; 10] = [0; 10];
    let mut level_state_f32: [f32; 10] = [0.0; 10];
    let mut label: [[u8; 16]; 32] = [[0; 16]; 32];
    let mut last_label: [[u8; 16]; 32] = [[0; 16]; 32];
    let mut mic_arb: [(u8, u8, u8, u8, u16); 8] = [(0, 0, 0, 0, 0); 8];
    let mut last_mic_arb: [(u8, u8, u8, u8, u16); 8] = [(0, 0, 0, 0, 0); 8];
    let mut packet_count: u32;
    let mut capture_time_sec: i32;
    let mut capture_time_usec: i32;
    let mut packet_counter: i32 = 0;
    while let Ok(packet) = cap.next_packet() {
        if packet.data.len() == 0 {
            println!("breaking!!!!");
            continue;
        }
        packet_counter += 1;
        println!("Packet number: {}", packet_counter);
        let udp_payload = &packet.data[42..646];
        // Collect flow information and create a unique hash ID
        let src_ip_bytes = &packet.data[26..30];
        let src_ip = IpAddr::from(Ipv4Addr::from(u32::from_be_bytes(src_ip_bytes.try_into().unwrap())));
        let dst_ip_bytes = &packet.data[30..34];
        let dst_ip = IpAddr::from(Ipv4Addr::from(u32::from_be_bytes(dst_ip_bytes.try_into().unwrap())));
        let src_port = u16::from_be_bytes([packet.data[34], packet.data[35]]);
        let dst_port = u16::from_be_bytes([packet.data[36], packet.data[37]]);

        let flow_id = FlowId {
            src_ip: src_ip.to_string(),
            src_port: src_port,
            dst_ip: dst_ip.to_string(),
            dst_port: dst_port,
        };
        let mut hasher = std::collections::hash_map::DefaultHasher::new();
        flow_id.hash(&mut hasher);
        let flow_hash = hasher.finish();
        
        // Check if version is 2--if not, discard packet; only way to tell if this is a udpgnet packet
        let version = udp_payload[0];
        if version != 2 {
            error!(root_logger, "Invalid packet received, flow ID: {:?}", flow_id);
            continue;
        }

        let retrieved_payload = payloads.get(&flow_hash);
        match retrieved_payload {
            None => {
                packet_count = 1;
                info!(root_logger, "{:?} - New flow detected: {:?}", flow_hash, flow_id);
                capture_time_sec = packet.header.ts.tv_sec;
                capture_time_usec = packet.header.ts.tv_usec;

            }
            Some(_inner) => {
                packet_count = retrieved_payload.unwrap().packet_count +1;

                if retrieved_payload.unwrap().payload == udp_payload {
                    // No change in payload, do nothing
                    //info!(root_logger, "{:?} - Packet Count: {}", flow_hash, packet_count);
                    //capture_time_sec = retrieved_payload.unwrap().capture_time_sec;
                    //capture_time_usec = retrieved_payload.unwrap().capture_time_usec;
                    continue;
                    
                } else {
                    // Calculate time since last change
                    capture_time_sec = packet.header.ts.tv_sec;
                    capture_time_usec = packet.header.ts.tv_usec;
                    let time_delta = (packet.header.ts.tv_sec - retrieved_payload.unwrap().capture_time_sec) as f64 + ((packet.header.ts.tv_usec - retrieved_payload.unwrap().capture_time_usec) as f64 / 1000000.0);
                    
                    info!(root_logger, "{:?} - New message received - Flow Packet Count: {} - Time since last change: {:?}", flow_hash, packet_count, time_delta);
                    
                    // Create message & last_message buffers
                    let message: Vec<u8> = udp_payload.to_vec();
                    let last_message = retrieved_payload.unwrap().payload.clone();

                    // Parse GPO states
                    buf_ptr = 8;
                    for i in 0..4 {
                        gpo_state[i] = u32::from_be_bytes(message[buf_ptr..buf_ptr+4].try_into().unwrap());
                        last_gpo_state[i] = u32::from_be_bytes(last_message[buf_ptr..buf_ptr+4].try_into().unwrap());
                        buf_ptr += 4;
                    }
                    for i in 0..4 {
                        for j in 0..32 {
                            let gpo = (gpo_state[i] >> j) & 1;
                            let last_gpo = (last_gpo_state[i] >> j) & 1;
                                if gpo != last_gpo {
                                    info!(root_logger, "{:?} - GPO: {} State: {}",flow_hash, i*32+j+1, gpo);
                                }
                            }
                    }


                    // Parse Level states
                    buf_ptr = 24;
                    for i in 0..10 {
                        level_state[i] = i16::from_be_bytes(message[buf_ptr..buf_ptr+2].try_into().unwrap());
                        level_state_f32[i] = level_state[i] as f32;
                        level_state_f32[i] /= 16.0;
                        buf_ptr += 2;
                    }
                    buf_ptr = 24;
                    for i in 0..10 {
                        if message[buf_ptr..buf_ptr+2] != last_message[buf_ptr..buf_ptr+2] {
                            info!(root_logger, "{:?} - Level: {} State: {}", flow_hash, i+1, level_state_f32[i]);
                        }
                        buf_ptr +=2;
                    }


                    // Parse Text labels
                    buf_ptr = 44;
                    for i in 0..32 {
                        label[i][..16].copy_from_slice(&message[buf_ptr..buf_ptr+16]);
                        last_label[i][..16].copy_from_slice(&last_message[buf_ptr..buf_ptr+16]);
                        buf_ptr += 16;
                    }

                    for i in 0..32 {
                        if label[i] != last_label[i] {
                            let label = String::from_utf8_lossy(&label[i]);
                            info!(root_logger, "{:?} - Label {} State: {:?}", flow_hash, i+1, label.to_string());

                        }
                    }

                    // Parse mic arbitration
                    buf_ptr = 556;
                    for i in 0..8 {
                        let nr = message[buf_ptr];
                        let rumble = message[buf_ptr + 1];
                        let phantom = message[buf_ptr + 2];
                        let pad = message[buf_ptr + 3];
                        let gain = u16::from_be_bytes(message[buf_ptr + 4..buf_ptr + 6].try_into().unwrap());
                        mic_arb[i] = (nr, rumble, phantom, pad, gain);
                        let last_nr = last_message[buf_ptr];
                        let last_rumble = last_message[buf_ptr + 1];
                        let last_phantom = last_message[buf_ptr + 2];
                        let last_pad = last_message[buf_ptr + 3];
                        let last_gain = u16::from_be_bytes(last_message[buf_ptr + 4..buf_ptr + 6].try_into().unwrap());
                        last_mic_arb[i] = (last_nr, last_rumble, last_phantom, last_pad, last_gain);
                        buf_ptr += 6;
                    }

                    for i in 0..8 {
                        if mic_arb[i] != last_mic_arb[i] {
                            info!(root_logger, "{:?} - Mic Arb: {} NR: {} Rumble: {} Phantom: {} Pad: {} Gain: {}", flow_hash, i+1, mic_arb[i].0, mic_arb[i].1, mic_arb[i].2, mic_arb[i].3, mic_arb[i].4);
                        }
                    }
                    
                }
            },
        }
        let flow_info = FlowInfo {
            payload: udp_payload.to_vec(),
            packet_count: packet_count,
            capture_time_sec: capture_time_sec,
            capture_time_usec: capture_time_usec,
        };
        
        payloads.insert(flow_hash,flow_info);

    }
    //Ok(())
    cap.next_packet();
    
    cap.list_datalinks();

    

    match cap.next_packet() {
        Ok(Packet) => {
            println!("Result: {:?}", cap.next_packet());
            println!("Stats: {:?}", cap.stats());
        },
        Err(err) => {
            println!("Error: {}", err);
            println!("Done");
        },
    }
}

But in that case, you are printing the result of the next call. The one that returns an Err (causing the loop to exit) will never be encountered, since you don't bind the Err case anywhere. If you use a match instead, then you can handle and print both cases.

The error itself might happen due to packets being dropped. This can be the case when e.g. you are using a protocol that doesn't enforce delivery. Is the interface you are listening to uses such a non-reliable protocol, for example UDP?

So I do have a match for the err case, but it is after the while loop--are you saying the match for the err case should be inside the while loop?

The protocol is indeed UDP based, but the cap.stats element doesn't show any packets being dropped. Even if packets were being dropped, my understanding of the cap.next_packet() method is that it should simply return the next packet in the buffer, so a packet being dropped should have no effect?

Thanks!

Obviously, the match must be inside the loop, since otherwise it gets lost. If you call the function after the loop, its return value is… well, whatever it is, but that's not the return value of the previous call (still inside the loop).