Csv parsing with multiple header data pairs and making generic

I have csv data which I have saved as a csv file but here is the data for context:

C,NEMP.WORLD,TRADINGIS,AEMO,PUBLIC,2024/03/03,13:30:11,0000000412683134,TRADINGIS,0000000412683133
I,TRADING,INTERCONNECTORRES,2,SETTLEMENTDATE,RUNNO,INTERCONNECTORID,PERIODID,METEREDMWFLOW,MWFLOW,MWLOSSES,LASTCHANGED
D,TRADING,INTERCONNECTORRES,2,"2024/03/03 13:35:00",1,N-Q-MNSP1,163,36.2,17,1.36,"2024/03/03 13:30:04"
D,TRADING,INTERCONNECTORRES,2,"2024/03/03 13:35:00",1,NSW1-QLD1,163,587,464.75,30.31,"2024/03/03 13:30:04"
D,TRADING,INTERCONNECTORRES,2,"2024/03/03 13:35:00",1,T-V-MNSP1,163,-407.8,-441,21.94,"2024/03/03 13:30:04"
D,TRADING,INTERCONNECTORRES,2,"2024/03/03 13:35:00",1,V-S-MNSP1,163,156,156.22,15.93,"2024/03/03 13:30:04"
D,TRADING,INTERCONNECTORRES,2,"2024/03/03 13:35:00",1,V-SA,163,-224.44,-202.19,4.53,"2024/03/03 13:30:04"
D,TRADING,INTERCONNECTORRES,2,"2024/03/03 13:35:00",1,VIC1-NSW1,163,-31.1,-392.37,-23.81,"2024/03/03 13:30:04"
I,TRADING,PRICE,3,SETTLEMENTDATE,RUNNO,REGIONID,PERIODID,RRP,EEP,INVALIDFLAG,LASTCHANGED,ROP,RAISE6SECRRP,RAISE6SECROP,RAISE60SECRRP,RAISE60SECROP,RAISE5MINRRP,RAISE5MINROP,RAISEREGRRP,RAISEREGROP,LOWER6SECRRP,LOWER6SECROP,LOWER60SECRRP,LOWER60SECROP,LOWER5MINRRP,LOWER5MINROP,LOWERREGRRP,LOWERREGROP,RAISE1SECRRP,RAISE1SECROP,LOWER1SECRRP,LOWER1SECROP,PRICE_STATUS
D,TRADING,PRICE,3,"2024/03/03 13:35:00",1,SA1,163,-63.45,0,0,"2024/03/03 13:30:04",-63.45,0,0,0,0,0,0,0.91,0.91,1.84,1.84,4.78,4.78,0.39,0.39,3.76,3.76,0,0,0,0,FIRM
D,TRADING,PRICE,3,"2024/03/03 13:35:00",1,NSW1,163,77.06,0,0,"2024/03/03 13:30:04",77.06,0,0,0,0,0,0,0.91,0.91,1.84,1.84,4.78,4.78,0.39,0.39,3.76,3.76,0,0,0,0,FIRM
D,TRADING,PRICE,3,"2024/03/03 13:35:00",1,QLD1,163,86,0,0,"2024/03/03 13:30:04",86,0,0,0,0,0,0,0.91,0.91,1.84,1.84,4.78,4.78,0.39,0.39,3.76,3.76,0,0,0,0,FIRM
D,TRADING,PRICE,3,"2024/03/03 13:35:00",1,TAS1,163,-40.01,0,0,"2024/03/03 13:30:04",-40.01,0.38,0.38,0.38,0.38,0.38,0.38,6.25,6.25,1.84,1.84,4.78,4.78,0,0,105.27,105.27,0,0,0,0,FIRM
D,TRADING,PRICE,3,"2024/03/03 13:35:00",1,VIC1,163,-67.1,0,0,"2024/03/03 13:30:04",-67.1,0,0,0,0,0,0,0.91,0.91,1.84,1.84,4.78,4.78,0.39,0.39,3.76,3.76,0,0,0,0,FIRM
C,"END OF REPORT",15

Each csv contains an ordered identifier C, I, D, C in first column. Heres my best guess at what they mean:

  • C: "Comment" or "Control" for comment or a control line that provides metadata.
  • I: "Information" or "Instruction". More specific list of headers used in the data
  • D: "Data" containing the actual data entries.
  • C (again): Another C line at the end of the data might indicate the end of the report or dataset.

The first 3 values for columns starting with I are unique for each set of data e.g:
"I,TRADING,INTERCONNECTORRES" is unique for struct InterconnectorData

This CSV contains multiple pairs of Information and Data for example:

C, # Control
I, # headers
D, # some data
D, # some more data
I, # new headers
D, # different data
C, # Control

2 questions:

  1. How to use account for potentially missing headers evident from I column?
  2. How to make implement this logic as a factory where RecordsFromCsv is a trait implemented for all csv struct data to be parsed:
let records: RecordsFromCsv = process_csv(filename, match, vec![&InterconnectorData, &PriceData]);

Here is my current code:

use std::fmt;
use csv::ReaderBuilder;
use serde::Deserialize;
use std::error::Error;
use std::fs::File;
use std::io::{BufRead, BufReader};

#[derive(Debug, Deserialize, Default)]
struct InterconnectorData {
    #[serde(rename = "ROW_TYPE")]
    row_type: String,
    #[serde(rename = "FILE_TYPE")]
    file_type: String,
    #[serde(rename = "FILE_SUBTYPE")]
    file_subtype: String,
    #[serde(rename = "FILE_DESCRIPTOR")]
    file_descriptor: String,
    #[serde(rename = "SETTLEMENTDATE")]
    settlement_date: Option<String>,
    #[serde(rename = "RUNNO")]
    run_no: Option<u32>,
    #[serde(rename = "INTERCONNECTORID")]
    interconnector_id: Option<String>,
    #[serde(rename = "PERIODID")]
    period_id: Option<u32>,
    #[serde(rename = "METEREDMWFLOW")]
    metered_mw_flow: Option<f64>,
    #[serde(rename = "MWFLOW")]
    mw_flow: Option<f64>,
    #[serde(rename = "MWLOSSES")]
    mw_losses: Option<f64>,
    #[serde(rename = "LASTCHANGED")]
    last_changed: Option<String>,
}

#[derive(Debug, Default, Deserialize)]
struct PriceData {
    #[serde(rename = "ROW_TYPE")]
    row_type: String,
    #[serde(rename = "FILE_TYPE")]
    file_type: String,
    #[serde(rename = "FILE_SUBTYPE")]
    file_subtype: String,
    #[serde(rename = "FILE_DESCRIPTOR")]
    file_descriptor: String,
    #[serde(rename = "SETTLEMENT_DATE")]
    settlement_date: Option<String>,
    #[serde(rename = "RUN_NO")]
    run_no: Option<u32>,
    #[serde(rename = "REGION_ID")]
    region_id: Option<String>,
    #[serde(rename = "PERIOD_ID")]
    period_id: Option<u32>,
    #[serde(rename = "RRP")]
    rrp: Option<f64>,
    #[serde(rename = "EEP")]
    eep: Option<f64>,
    #[serde(rename = "INVALID_FLAG")]
    invalid_flag: Option<u32>,
    #[serde(rename = "LAST_CHANGED")]
    last_changed: Option<String>,
    #[serde(rename = "ROP")]
    rop: Option<f64>,
    #[serde(rename = "RAISE6SEC_RRP")]
    raise6sec_rrp: Option<f64>,
    #[serde(rename = "RAISE6SEC_ROP")]
    raise6sec_rop: Option<f64>,
    #[serde(rename = "RAISE60SEC_RRP")]
    raise60sec_rrp: Option<f64>,
    #[serde(rename = "RAISE60SEC_ROP")]
    raise60sec_rop: Option<f64>,
    #[serde(rename = "RAISE5MIN_RRP")]
    raise5min_rrp: Option<f64>,
    #[serde(rename = "RAISE5MIN_ROP")]
    raise5min_rop: Option<f64>,
    #[serde(rename = "RAISEREG_RRP")]
    raisereg_rrp: Option<f64>,
    #[serde(rename = "RAISEREG_ROP")]
    raisereg_rop: Option<f64>,
    #[serde(rename = "LOWER6SEC_RRP")]
    lower6sec_rrp: Option<f64>,
    #[serde(rename = "LOWER6SEC_ROP")]
    lower6sec_rop: Option<f64>,
    #[serde(rename = "LOWER60SEC_RRP")]
    lower60sec_rrp: Option<f64>,
    #[serde(rename = "LOWER60SEC_ROP")]
    lower60sec_rop: Option<f64>,
    #[serde(rename = "LOWER5MIN_RRP")]
    lower5min_rrp: Option<f64>,
    #[serde(rename = "LOWER5MIN_ROP")]
    lower5min_rop: Option<f64>,
    #[serde(rename = "LOWERREG_RRP")]
    lowerreg_rrp: Option<f64>,
    #[serde(rename = "LOWERREG_ROP")]
    lowerreg_rop: Option<f64>,
    #[serde(rename = "RAISE1SEC_RRP")]
    raise1sec_rrp: Option<f64>,
    #[serde(rename = "RAISE1SEC_ROP")]
    raise1sec_rop: Option<f64>,
    #[serde(rename = "LOWER1SEC_RRP")]
    lower1sec_rrp: Option<f64>,
    #[serde(rename = "LOWER1SEC_ROP")]
    lower1sec_rop: Option<f64>,
    #[serde(rename = "PRICE_STATUS")]
    price_status: Option<String>,
}

#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum RecordCurrentTradingIs {
    Variant1(InterconnectorData),
    Variant2(PriceData),
}

impl fmt::Display for InterconnectorData {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "InterconnectorData: row_type: {}, file_type: {}, file_subtype: {}, file_descriptor: {}, settlement_date: {:?}, run_no: {:?}, interconnector_id: {:?}, period_id: {:?}, metered_mw_flow: {:?}, mw_flow: {:?}, mw_losses: {:?}, last_changed: {:?}",
            self.row_type, self.file_type, self.file_subtype, self.file_descriptor,
            self.settlement_date, self.run_no, self.interconnector_id, self.period_id,
            self.metered_mw_flow, self.mw_flow, self.mw_losses, self.last_changed)
    }
}

// Implement Display for PriceData
impl fmt::Display for PriceData {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "PriceData: row_type: {}, file_type: {}, file_subtype: {}, file_descriptor: {}, settlement_date: {:?}, run_no: {:?}, region_id: {:?}, period_id: {:?}, rrp: {:?}, eep: {:?}, invalid_flag: {:?}, last_changed: {:?}, rop: {:?}, raise6sec_rrp: {:?}, raise6sec_rop: {:?}, raise60sec_rrp: {:?}, raise60sec_rop: {:?}, raise5min_rrp: {:?}, raise5min_rop: {:?}, raisereg_rrp: {:?}, raisereg_rop: {:?}, lower6sec_rrp: {:?}, lower6sec_rop: {:?}, lower60sec_rrp: {:?}, lower60sec_rop: {:?}, lower5min_rrp: {:?}, lower5min_rop: {:?}, lowerreg_rrp: {:?}, lowerreg_rop: {:?}, raise1sec_rrp: {:?}, raise1sec_rop: {:?}, lower1sec_rrp: {:?}, lower1sec_rop: {:?}, price_status: {:?}",
            self.row_type, self.file_type, self.file_subtype, self.file_descriptor,
            self.settlement_date, self.run_no, self.region_id, self.period_id,
            self.rrp, self.eep, self.invalid_flag, self.last_changed,
            self.rop, self.raise6sec_rrp, self.raise6sec_rop, self.raise60sec_rrp, self.raise60sec_rop,
            self.raise5min_rrp, self.raise5min_rop, self.raisereg_rrp, self.raisereg_rop,
            self.lower6sec_rrp, self.lower6sec_rop, self.lower60sec_rrp, self.lower60sec_rop,
            self.lower5min_rrp, self.lower5min_rop, self.lowerreg_rrp, self.lowerreg_rop,
            self.raise1sec_rrp, self.raise1sec_rop, self.lower1sec_rrp, self.lower1sec_rop,
            self.price_status)
    }
}

// Implement Display for RecordCurrentTradingIs
impl fmt::Display for RecordCurrentTradingIs {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            RecordCurrentTradingIs::Variant1(data) => write!(f, "{}", data),
            RecordCurrentTradingIs::Variant2(data) => write!(f, "{}", data),
        }
    }
}

trait ProcessRecord<T> {
    fn process(line: &str) -> Result<T, Box<dyn Error>>;
}

impl ProcessRecord<RecordCurrentTradingIs> for InterconnectorData {
    fn process(line: &str) -> Result<RecordCurrentTradingIs, Box<dyn Error>> {
        let mut rdr = ReaderBuilder::new()
            .has_headers(false)
            .from_reader(line.as_bytes());
        let record = rdr
            .deserialize::<InterconnectorData>()
            .next()
            .ok_or("No record found")??;
        Ok(RecordCurrentTradingIs::Variant1(record))
    }
}

impl ProcessRecord<RecordCurrentTradingIs> for PriceData {
    fn process(line: &str) -> Result<RecordCurrentTradingIs, Box<dyn Error>> {
        let mut rdr = ReaderBuilder::new()
            .has_headers(false)
            .from_reader(line.as_bytes());
        let record = rdr
            .deserialize::<PriceData>()
            .next()
            .ok_or("No record found")??;
        Ok(RecordCurrentTradingIs::Variant2(record))
    }
}

fn process_file_current_trading_is(
    path: &str,
) -> Result<Vec<RecordCurrentTradingIs>, Box<dyn Error>> {
    let file = File::open(path)?;
    let reader = BufReader::new(file);
    let mut records: Vec<RecordCurrentTradingIs> = Vec::new();

    for line_result in reader.lines() {
        let line = line_result?;
        if line.starts_with("I,TRADING,INTERCONNECTORRES") || line.starts_with("I,TRADING,PRICE") {
            println!("{:?}", &line);
        }
        match line.chars().next() {
            Some('C') | Some('I') => continue,
            Some('D') => {
                if line.starts_with("D,TRADING,INTERCONNECTORRES") {
                    records.push(InterconnectorData::process(&line)?);
                } else if line.starts_with("D,TRADING,PRICE") {
                    records.push(PriceData::process(&line)?);
                } else {
                    return Err("Unknown record type".into());
                }
            }
            _ => return Err("Invalid line format".into()),
        }
    }

    Ok(records)
}

fn main() -> Result<(), Box<dyn Error>> {
    let path = "src/fixtures/PUBLIC_TRADINGIS_202403031335_0000000412683134.CSV";
    let records = process_file_current_trading_is(path)?;
    for record in records {
        println!("{}", record);
    }

    Ok(())
}

when trying to implement polymorphism it looked like this but it got way complicated and over my head when making the headers dynamic:

trait DataProcessor {
    fn matches(&self, data: &str) -> bool;
    fn process(&self);
}

trait DataProcessorFactory {
    fn from_data_row(data: &str) -> Box<dyn DataProcessor>;
}

// Implement DataProcessorFactory for each type
impl DataProcessorFactory for InterconnectorData {
    fn from_data_row(data: &str) -> Box<dyn DataProcessor> {
        // parsing &str data as parts and reassembling Box of InterconnectorData data
    }
}

impl DataProcessorFactory for PriceData {
    fn from_data_row(data: &str) -> Box<dyn DataProcessor> {
        // parsing &str data as parts and reassembling Box of Price data
    }
}

fn data_processor_factory(
    data: &str,
    factories: &[&dyn DataProcessorFactory],
) -> Option<Box<dyn DataProcessor>> {
    for factory in factories {
        // Use a temporary instance to check if it matches
        // This is bad since every row requires instance checks rather than using state of most recent I column
        let temp_instance = factory.from_data_row("");
        if temp_instance.matches(data) {
            return Some(factory.from_data_row(data));
        }
    }
    None
}

fn main() {
    let data_rows = vec![
        // update with csv data
    ];

    let factories: Vec<&dyn DataProcessorFactory> =
        vec![&InterconnectorData::default(), &PriceData::default()];

    for data_row in data_rows {
        if let Some(processor) = data_processor_factory(data_row, &factories) {
            processor.process();
        } else {
            println!("No processor found for row: {}", data_row);
        }
    }
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.