Hello all,
I've written a program that uses the pcap crate to capture packets, filters for a specific protocol, parses the data in the messages and logs any changes between messages for each unique flow. The program works with low traffic levels, however when I increase the amount of traffic to be parsed (i.e. by running it in the production environment), the program exits without any errors, and I'm struggling to figure out why.
The bulk of the program runs in this while loop:
#141 -> while let Ok(packet) = cap.next_packet() {
....
}
I know that the while loop is exiting (I print to the console after the while loop), which I would have thought meant that cap.next_packet() is returning an error, however, I placed a match statement after the while loop to print the results of cap.next_packet(), and it consistently returns an OK, so I'm not sure why the while loop would exit?
The error does not happen at the same time--sometimes it's after a few hundred thousand packets, sometimes its after only a few packets--there doesn't seem to be any consistency.
Printing out cap.stats does not show any dropped packets. I disabled all the logging to console/file, and that had no effect.
Does anybody have any suggestions on how I could debug this? I'm still fairly new to this, so apologies if I have missed something very obvious. Thanks!
Full code below:
extern crate chrono;
extern crate pcap;
extern crate pnet;
use chrono::Local;
use pcap::{Capture, Device, Packet};
use std::collections::HashMap;
use std::convert::TryInto;
use std::env;
use std::fs::OpenOptions;
use std::hash::{Hash, Hasher};
use std::net::{IpAddr, Ipv4Addr};
use std::io::{stdin,stdout, Write};
#[macro_use]
extern crate slog;
extern crate slog_async;
extern crate slog_json;
extern crate slog_stream;
extern crate slog_term;
use slog::Drain;
#[derive(Debug)]
struct FlowId {
src_ip: String,
src_port: u16,
dst_ip: String,
dst_port: u16,
}
#[derive(Debug)]
struct FlowInfo {
payload: Vec<u8>,
packet_count: u32,
capture_time_sec: i32,
capture_time_usec: i32,
}
impl Hash for FlowId {
fn hash<H: Hasher>(&self, state: &mut H) {
self.src_ip.hash(state);
self.src_port.hash(state);
self.dst_ip.hash(state);
self.dst_port.hash(state);
}
}
//fn main() -> Result<(), std::io::Error> {
fn main() {
dotenv::dotenv().ok();
let args: Vec<String> = env::args().collect();
// Print arguments
for argument in args.iter() {
println!("{}", argument);
}
// Initialize logging
let drain = slog_async::Async::new(
slog::Duplicate::new(
slog::Filter::new(
slog_term::FullFormat::new(
slog_term::PlainSyncDecorator::new(std::io::stderr(),)).build(),
|record: &slog::Record| record.level().is_at_least(slog::Level::Warning),
),
slog_term::FullFormat::new(slog_term::PlainSyncDecorator::new(std::io::stdout())).build(),
).fuse()
).chan_size(4096).build().fuse();
let start_time = Local::now();
let log_path = format!("udp_sniffer-{}.log", start_time.format("%Y-%m-%d-%H-%M-%S"));
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(log_path)
.unwrap();
let file_decorator = slog_term::PlainDecorator::new(file);
let file_drain = slog_term::FullFormat::new(file_decorator).build().fuse();
let file_drain = slog_async::Async::new(file_drain).build().fuse();
//let _log = slog::Logger::root(file_drain, o!());
//let root_logger = slog::Logger::root(drain, o!());
let root_logger = slog::Logger::root(slog::Duplicate::new(drain, file_drain).fuse(), o!());
info!(root_logger, "";
"Application started at" => format!("{}", chrono::Utc::now()));
// Get a list of all available network devices
let devices_list: Vec<Device> = Device::list().unwrap();
/*
// Prompt the user to select a network device, only display interfaces with IPv4 addresses
println!("Available network interfaces:");
for (i, device) in devices_list.iter().enumerate() {
for address in &device.addresses {
if address.addr.is_ipv4() {
println!("{}: {} - {:?} - {}", i+1, device.name, device.desc.clone().unwrap(), address.addr);
}
}
}
print!("Enter the number of the interface you want to use: ");
stdout().flush()?;
let mut device_selection = String::new();
stdin().read_line(&mut device_selection)?;
let device_selection = device_selection.trim();
let selected_interface: usize = device_selection.parse().unwrap();
*/
let selected_device: Device = devices_list[0].clone();
let device_name = selected_device.desc.clone().unwrap();
let mut cap = Capture::from_device(selected_device).unwrap()
.snaplen(65535)
.promisc(true)
.immediate_mode(false)
.buffer_size(1024 * 1024 * 1024)
.open().unwrap();
// Print the name of the interface
//info!(root_logger, "Capturing packets from interface {:?}", device_name);
// Set the filter to only capture packets that are 646 bytes long
cap.filter("len == 646", true).unwrap();
let mut payloads:HashMap<u64, FlowInfo> = HashMap::new();
let mut buf_ptr:usize;
let mut gpo_state: [u32; 4] = [0; 4];
let mut last_gpo_state: [u32; 4] = [0; 4];
let mut level_state: [i16; 10] = [0; 10];
let mut level_state_f32: [f32; 10] = [0.0; 10];
let mut label: [[u8; 16]; 32] = [[0; 16]; 32];
let mut last_label: [[u8; 16]; 32] = [[0; 16]; 32];
let mut mic_arb: [(u8, u8, u8, u8, u16); 8] = [(0, 0, 0, 0, 0); 8];
let mut last_mic_arb: [(u8, u8, u8, u8, u16); 8] = [(0, 0, 0, 0, 0); 8];
let mut packet_count: u32;
let mut capture_time_sec: i32;
let mut capture_time_usec: i32;
let mut packet_counter: i32 = 0;
while let Ok(packet) = cap.next_packet() {
if packet.data.len() == 0 {
println!("breaking!!!!");
continue;
}
packet_counter += 1;
println!("Packet number: {}", packet_counter);
let udp_payload = &packet.data[42..646];
// Collect flow information and create a unique hash ID
let src_ip_bytes = &packet.data[26..30];
let src_ip = IpAddr::from(Ipv4Addr::from(u32::from_be_bytes(src_ip_bytes.try_into().unwrap())));
let dst_ip_bytes = &packet.data[30..34];
let dst_ip = IpAddr::from(Ipv4Addr::from(u32::from_be_bytes(dst_ip_bytes.try_into().unwrap())));
let src_port = u16::from_be_bytes([packet.data[34], packet.data[35]]);
let dst_port = u16::from_be_bytes([packet.data[36], packet.data[37]]);
let flow_id = FlowId {
src_ip: src_ip.to_string(),
src_port: src_port,
dst_ip: dst_ip.to_string(),
dst_port: dst_port,
};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
flow_id.hash(&mut hasher);
let flow_hash = hasher.finish();
// Check if version is 2--if not, discard packet; only way to tell if this is a udpgnet packet
let version = udp_payload[0];
if version != 2 {
error!(root_logger, "Invalid packet received, flow ID: {:?}", flow_id);
continue;
}
let retrieved_payload = payloads.get(&flow_hash);
match retrieved_payload {
None => {
packet_count = 1;
info!(root_logger, "{:?} - New flow detected: {:?}", flow_hash, flow_id);
capture_time_sec = packet.header.ts.tv_sec;
capture_time_usec = packet.header.ts.tv_usec;
}
Some(_inner) => {
packet_count = retrieved_payload.unwrap().packet_count +1;
if retrieved_payload.unwrap().payload == udp_payload {
// No change in payload, do nothing
//info!(root_logger, "{:?} - Packet Count: {}", flow_hash, packet_count);
//capture_time_sec = retrieved_payload.unwrap().capture_time_sec;
//capture_time_usec = retrieved_payload.unwrap().capture_time_usec;
continue;
} else {
// Calculate time since last change
capture_time_sec = packet.header.ts.tv_sec;
capture_time_usec = packet.header.ts.tv_usec;
let time_delta = (packet.header.ts.tv_sec - retrieved_payload.unwrap().capture_time_sec) as f64 + ((packet.header.ts.tv_usec - retrieved_payload.unwrap().capture_time_usec) as f64 / 1000000.0);
info!(root_logger, "{:?} - New message received - Flow Packet Count: {} - Time since last change: {:?}", flow_hash, packet_count, time_delta);
// Create message & last_message buffers
let message: Vec<u8> = udp_payload.to_vec();
let last_message = retrieved_payload.unwrap().payload.clone();
// Parse GPO states
buf_ptr = 8;
for i in 0..4 {
gpo_state[i] = u32::from_be_bytes(message[buf_ptr..buf_ptr+4].try_into().unwrap());
last_gpo_state[i] = u32::from_be_bytes(last_message[buf_ptr..buf_ptr+4].try_into().unwrap());
buf_ptr += 4;
}
for i in 0..4 {
for j in 0..32 {
let gpo = (gpo_state[i] >> j) & 1;
let last_gpo = (last_gpo_state[i] >> j) & 1;
if gpo != last_gpo {
info!(root_logger, "{:?} - GPO: {} State: {}",flow_hash, i*32+j+1, gpo);
}
}
}
// Parse Level states
buf_ptr = 24;
for i in 0..10 {
level_state[i] = i16::from_be_bytes(message[buf_ptr..buf_ptr+2].try_into().unwrap());
level_state_f32[i] = level_state[i] as f32;
level_state_f32[i] /= 16.0;
buf_ptr += 2;
}
buf_ptr = 24;
for i in 0..10 {
if message[buf_ptr..buf_ptr+2] != last_message[buf_ptr..buf_ptr+2] {
info!(root_logger, "{:?} - Level: {} State: {}", flow_hash, i+1, level_state_f32[i]);
}
buf_ptr +=2;
}
// Parse Text labels
buf_ptr = 44;
for i in 0..32 {
label[i][..16].copy_from_slice(&message[buf_ptr..buf_ptr+16]);
last_label[i][..16].copy_from_slice(&last_message[buf_ptr..buf_ptr+16]);
buf_ptr += 16;
}
for i in 0..32 {
if label[i] != last_label[i] {
let label = String::from_utf8_lossy(&label[i]);
info!(root_logger, "{:?} - Label {} State: {:?}", flow_hash, i+1, label.to_string());
}
}
// Parse mic arbitration
buf_ptr = 556;
for i in 0..8 {
let nr = message[buf_ptr];
let rumble = message[buf_ptr + 1];
let phantom = message[buf_ptr + 2];
let pad = message[buf_ptr + 3];
let gain = u16::from_be_bytes(message[buf_ptr + 4..buf_ptr + 6].try_into().unwrap());
mic_arb[i] = (nr, rumble, phantom, pad, gain);
let last_nr = last_message[buf_ptr];
let last_rumble = last_message[buf_ptr + 1];
let last_phantom = last_message[buf_ptr + 2];
let last_pad = last_message[buf_ptr + 3];
let last_gain = u16::from_be_bytes(last_message[buf_ptr + 4..buf_ptr + 6].try_into().unwrap());
last_mic_arb[i] = (last_nr, last_rumble, last_phantom, last_pad, last_gain);
buf_ptr += 6;
}
for i in 0..8 {
if mic_arb[i] != last_mic_arb[i] {
info!(root_logger, "{:?} - Mic Arb: {} NR: {} Rumble: {} Phantom: {} Pad: {} Gain: {}", flow_hash, i+1, mic_arb[i].0, mic_arb[i].1, mic_arb[i].2, mic_arb[i].3, mic_arb[i].4);
}
}
}
},
}
let flow_info = FlowInfo {
payload: udp_payload.to_vec(),
packet_count: packet_count,
capture_time_sec: capture_time_sec,
capture_time_usec: capture_time_usec,
};
payloads.insert(flow_hash,flow_info);
}
//Ok(())
cap.next_packet();
cap.list_datalinks();
match cap.next_packet() {
Ok(Packet) => {
println!("Result: {:?}", cap.next_packet());
println!("Stats: {:?}", cap.stats());
},
Err(err) => {
println!("Error: {}", err);
println!("Done");
},
}
}