From d3a74488834d93e9fff737d4c97cc031907a1b4c Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Thu, 11 Sep 2025 16:18:21 +0200 Subject: [PATCH] Improve conversion from original csv or zip files to parquet with more robust schema definition --- How_to_convert_SP2XR_raw_data_to_parquet.md | 60 +++++ meta_files/config_with_mapping.yaml | 252 ++++++++++++++++++++ meta_files/generate_config.py | 196 ++++++++++++++- src/sp2xr/io.py | 69 ++++-- tests/test_column_mapping.py | 93 ++++++++ 5 files changed, 650 insertions(+), 20 deletions(-) create mode 100644 How_to_convert_SP2XR_raw_data_to_parquet.md create mode 100644 meta_files/config_with_mapping.yaml create mode 100644 tests/test_column_mapping.py diff --git a/How_to_convert_SP2XR_raw_data_to_parquet.md b/How_to_convert_SP2XR_raw_data_to_parquet.md new file mode 100644 index 0000000..ade022e --- /dev/null +++ b/How_to_convert_SP2XR_raw_data_to_parquet.md @@ -0,0 +1,60 @@ +# Conversion of SP2-XR *PbP* and *HK* .csv/.zip files to .parquet + +## Overview +Different SP2-XR instrument versions may use different column names in their CSV/ZIP data files. To ensure consistent data processing, the SP2-XR package now supports automatic column name standardization during CSV to Parquet conversion. + +## How It Works +1. **Input**: Instrument-specific CSV/ZIP files with their original column names +2. **Mapping**: A configuration file maps your column names to canonical (standard) column names +3. **Output**: Parquet files with standardized column names for consistent downstream processing + +## Usage + +### Step 1: Generate Enhanced Config Template +Use `generate_config.py` to create a configuration template: + +```python +from meta_files.generate_config import generate_mapping_template + +# Generate config with column mappings +generate_mapping_template("your_pbp_file.csv", "your_hk_file.csv", "config_with_mapping.yaml") +``` + +### Step 2: Customize Column Mappings +Open the generated `config_with_mapping.yaml` and update the column mappings: + +```yaml +pbp_column_mapping: + # Canonical name -> Your file's column name + Time (sec): "Time_Seconds" # Replace with your actual column name + Particle Flags: "Flags" # Replace with your actual column name + Incand Mass (fg): "Mass_fg" # Replace with your actual column name + # ... etc + +hk_column_mapping: + Time Stamp: "Timestamp" # Replace with your actual column name + Time (sec): "Time_Seconds" # Replace with your actual column name + # ... etc +``` + +### Step 3: Convert CSV to Parquet with Mapping +Use your customized config with the CSV to Parquet conversion: + +```bash +python scripts/sp2xr_csv2parquet.py --source /path/to/csv --target /path/to/parquet --config config_with_mapping.yaml +``` + +## Configuration File Structure + +The enhanced config file contains several sections: + +- **`pbp_schema`** / **`hk_schema`**: Data types for your input files +- **`pbp_canonical_schema`** / **`hk_canonical_schema`**: Standard column schemas used by SP2-XR processing +- **`pbp_column_mapping`** / **`hk_column_mapping`**: Maps canonical names to your file's column names + +## Column Mapping Rules + +1. **Exact matches**: If your column names exactly match canonical names, they're automatically mapped +2. **Custom mapping**: Replace placeholder values with your actual column names +3. **Missing columns**: Set to `null` or remove the line if your data doesn't have that column +4. **Extra columns**: Unmapped columns in your files are preserved as-is \ No newline at end of file diff --git a/meta_files/config_with_mapping.yaml b/meta_files/config_with_mapping.yaml new file mode 100644 index 0000000..1b3edf1 --- /dev/null +++ b/meta_files/config_with_mapping.yaml @@ -0,0 +1,252 @@ +'# INSTRUCTIONS': +- This config file contains both schema definitions and column mappings. +- 1. The *_schema sections define the data types for your input files. +- 2. The *_column_mapping sections map your file columns to canonical names. +- 3. Replace placeholder values (YOUR_COLUMN_NAME_FOR_*) with actual column names + from your files. +- 4. If your file doesn't have a particular canonical column, set it to null or remove + the line. +- 5. The output parquet files will use the canonical column names for consistency. +pbp_schema: + Time (sec): float + Packet Time Stamp: float + Flag: int + Dropped Records: int + Record Count: int + Record Size: int + Particle Time Stamp: float + Particle Flags: int + Scatter relPeak: float + Scatter Transit Time: int + Scatter Peak Time: int + Scatter FWHM: int + Scatter Size (nm): float + Incand relPeak: float + Incand Transit Time: float + Incand Peak Time: float + Incand FWHM: float + Incand Delay: float + Incand Mass (fg): float + Reserved: int +hk_schema: + Time Stamp: string + Time (sec): float + Time Stamp (UTC sec): float + Elapsed Time: float + Error Code: int + Packet Time Stamp: float + Laser TEC Temp (C): float + Crystal TEC Temp (C): int + Inlet Air Temp (C): float + Computer Heatsink Temp (C): float + Laser Heatsink Temp (C): float + Outlet Air Temp (C): float + YAG Output Monitor (V): float + Cavity Pressure (hPa): float + Laser Driver Power Monitor (uA): int + Laser Driver Current Limit Monitor (A): float + Laser Driver Current Monitor (A): float + Laser TEC Sense: float + Laser Over Temp (On/Off): int + +5V Laser Rail (V): float + ' +5V Rail (V)': float + +12V Rail (V): float + High Voltage (V): float + Battery Temp (C): float + UPS Output (V): float + 12V Iso Rail (V): float + 5V Iso Rail (V): float + 3.3V Iso Rail (V): float + Spare 22: int + Spare 23: int + 408 Board Spare 0: int + 408 Board Spare 1: int + 408 Board Spare 2: int + 408 Board Spare 3: int + 408 Board Spare 4: int + Purge Flow Monitor (sccm): float + System Input Voltage (V): float + Board Temperature (C): float + 408 Board Spare 8: int + 408 Board Spare 9: int + 408 Board Spare 10: int + 408 Board Spare 11: int + 408 Board Spare 12: int + 408 Board Spare 13: int + 408 Board Spare 14: int + 408 Board Spare 15: int + Sheath Flow Controller Read (vccm): int + Sheath Flow Controller Read (sccm): int + Sheath Flow Controller Pressure (psia): float + Sheath Flow Controller Temperature (C): float + Sample Flow Controller Read (vccm): float + Sample Flow Controller Read (sccm): float + Sample Flow Controller Pressure (psia): float + Sample Flow Controller Temperature (C): float + Fan 1 (RPM): int + Fan 2 (RPM): int + Laser Fan (RPM): int + Spare tach: int + Threshold Crossing Events: int + Dual Qualified Scatter and Incand Particles: int + Qualified Scatter Only Particles: int + Qualified Incand Only Particles: int + Disqualified Due to Scatter Saturation: int + Disqualified Due to Scatter Transit Time Min: int + Disqualified Due to Scatter Transit Time Max: int + Disqualified Due to Scatter FWHM Min: int + Disqualified Due to Scatter FWHM Max: int + Scatter Inter Part Period Min Violation: int + Disqualified Due to Incand Saturation: int + Disqualified Due to Incand Transit Time Min: int + Disqualified Due to Incand Transit Time Max: int + Disqualified Due to Incand FWHM Min: int + Disqualified Due to Incand FWHM Max: int + Incand Inter Part Period Min Violation: int + Baseline Sizer Lo: int + Baseline Sizer Hi: int + Baseline Incand Lo: int + Baseline Incand Hi: int + Bandwidth Sizer Hi: int + Bandwidth Sizer Lo: int + Bandwidth Incand Lo: int + Bandwidth Incand Hi: int + ABD-0408 HK ADCs min: int + ABD-0436 HK ADCs min: int + ABD-0408 HK ADCs max: int + ABD-0436 HK ADCs max: int + Incand Particle Conc (cts/ccm): float + Scattering Particle Conc (cts/ccm): float + Incand Mass Conc (fg/sccm): float + Scattering Mass Conc (fg/sccm): float + Sheath Flow Set Point: int + Sample Flow Set Point: int + Laser Temp Set Point: int + Laser Current Set Point: float + Spare 4 Set Point: int + Spare 5 Set Point: int + PMT HV Set Point: float + Particle Density (g/ccm): float + PbP Packet Time: float + Scatter Bin 1: int + Scatter Bin 2: int + Scatter Bin 3: int + Scatter Bin 4: int + Scatter Bin 5: int + Scatter Bin 6: int + Scatter Bin 7: int + Scatter Bin 8: int + Scatter Bin 9: int + Scatter Bin 10: int + Scatter Bin 11: int + Scatter Bin 12: int + Scatter Bin 13: int + Scatter Bin 14: int + Scatter Bin 15: int + Scatter Bin 16: int + Scatter Bin 17: int + Scatter Bin 18: int + Scatter Bin 19: int + Scatter Bin 20: int + Incand Bin 1: int + Incand Bin 2: int + Incand Bin 3: int + Incand Bin 4: int + Incand Bin 5: int + Incand Bin 6: int + Incand Bin 7: int + Incand Bin 8: int + Incand Bin 9: int + Incand Bin 10: int + Incand Bin 11: int + Incand Bin 12: int + Incand Bin 13: int + Incand Bin 14: int + Incand Bin 15: int + Incand Bin 16: int + Incand Bin 17: int + Incand Bin 18: int + Incand Bin 19: int + Incand Bin 20: int +pbp_canonical_schema: + Time (sec): float + Packet Time Stamp: float + Flag: float + Dropped Records: float + Record Count: float + Record Size: float + Particle Time Stamp: float + Particle Flags: float + Scatter relPeak: float + Scatter Transit Time: float + Scatter Peak Time: float + Scatter FWHM: float + Scatter Size (nm): float + Incand relPeak: float + Incand Transit Time: float + Incand Peak Time: float + Incand FWHM: float + Incand Delay: float + Incand Mass (fg): float + Reserved: float +hk_canonical_schema: + Time Stamp: datetime + Time (sec): float + Sample Flow Controller Read (sccm): float + Sample Flow Controller Read (vccm): float + Time Stamp (UTC sec): float + Elapsed Time: float + Error Code: float + Packet Time Stamp: float + Laser TEC Temp (C): float + Crystal TEC Temp (C): float + Inlet Air Temp (C): float + Computer Heatsink Temp (C): float + Laser Heatsink Temp (C): float + Outlet Air Temp (C): float + YAG Output Monitor (V): float + Cavity Pressure (hPa): float + Laser Driver Power Monitor (uA): float + Laser Driver Current Limit Monitor (A): float + Laser Driver Current Monitor (A): float +pbp_column_mapping: + Time (sec): Time (sec) + Packet Time Stamp: Packet Time Stamp + Flag: Flag + Dropped Records: Dropped Records + Record Count: Record Count + Record Size: Record Size + Particle Time Stamp: Particle Time Stamp + Particle Flags: Particle Flags + Scatter relPeak: Scatter relPeak + Scatter Transit Time: Scatter Transit Time + Scatter Peak Time: Scatter Peak Time + Scatter FWHM: Scatter FWHM + Scatter Size (nm): Scatter Size (nm) + Incand relPeak: Incand relPeak + Incand Transit Time: Incand Transit Time + Incand Peak Time: Incand Peak Time + Incand FWHM: Incand FWHM + Incand Delay: Incand Delay + Incand Mass (fg): Incand Mass (fg) + Reserved: Reserved +hk_column_mapping: + Time Stamp: Time Stamp + Time (sec): Time (sec) + Sample Flow Controller Read (sccm): Sample Flow Controller Read (sccm) + Sample Flow Controller Read (vccm): Sample Flow Controller Read (vccm) + Time Stamp (UTC sec): Time Stamp (UTC sec) + Elapsed Time: Elapsed Time + Error Code: Error Code + Packet Time Stamp: Packet Time Stamp + Laser TEC Temp (C): Laser TEC Temp (C) + Crystal TEC Temp (C): Crystal TEC Temp (C) + Inlet Air Temp (C): Inlet Air Temp (C) + Computer Heatsink Temp (C): Computer Heatsink Temp (C) + Laser Heatsink Temp (C): Laser Heatsink Temp (C) + Outlet Air Temp (C): Outlet Air Temp (C) + YAG Output Monitor (V): YAG Output Monitor (V) + Cavity Pressure (hPa): Cavity Pressure (hPa) + Laser Driver Power Monitor (uA): Laser Driver Power Monitor (uA) + Laser Driver Current Limit Monitor (A): Laser Driver Current Limit Monitor (A) + Laser Driver Current Monitor (A): Laser Driver Current Monitor (A) diff --git a/meta_files/generate_config.py b/meta_files/generate_config.py index f9d52cf..41bafef 100644 --- a/meta_files/generate_config.py +++ b/meta_files/generate_config.py @@ -1,9 +1,14 @@ +from __future__ import annotations + import pandas as pd import yaml import os +from pathlib import Path +from typing import Any -def infer_general_dtype(dtype): +def infer_general_dtype(dtype: Any) -> str: + """Infer general data type from pandas dtype.""" if pd.api.types.is_integer_dtype(dtype): return "int" elif pd.api.types.is_float_dtype(dtype): @@ -14,10 +19,11 @@ def infer_general_dtype(dtype): return "string" -def load_schema(input_file): - ext = os.path.splitext(input_file)[1].lower() +def load_schema(input_file: str | Path) -> dict[str, str]: + """Load schema from input file by inferring column types.""" + ext = os.path.splitext(str(input_file))[1].lower() - if ext == ".csv": + if ext in [".csv", ".zip"]: df = pd.read_csv(input_file, nrows=100) elif ext == ".parquet": df = pd.read_parquet(input_file) @@ -28,7 +34,62 @@ def load_schema(input_file): return schema -def generate_combined_config(pbp_file, hk_file, output_file="config.yaml"): +def get_canonical_schemas() -> dict[str, dict[str, str]]: + """Return canonical column schemas for SP2XR data.""" + pbp_canonical = { + "Time (sec)": "float", + "Packet Time Stamp": "float", + "Flag": "float", + "Dropped Records": "float", + "Record Count": "float", + "Record Size": "float", + "Particle Time Stamp": "float", + "Particle Flags": "float", + "Scatter relPeak": "float", + "Scatter Transit Time": "float", + "Scatter Peak Time": "float", + "Scatter FWHM": "float", + "Scatter Size (nm)": "float", + "Incand relPeak": "float", + "Incand Transit Time": "float", + "Incand Peak Time": "float", + "Incand FWHM": "float", + "Incand Delay": "float", + "Incand Mass (fg)": "float", + "Reserved": "float", + } + + hk_canonical = { + "Time Stamp": "datetime", + "Time (sec)": "float", + "Sample Flow Controller Read (sccm)": "float", + "Sample Flow Controller Read (vccm)": "float", + # Core HK columns that are commonly used + "Time Stamp (UTC sec)": "float", + "Elapsed Time": "float", + "Error Code": "float", + "Packet Time Stamp": "float", + "Laser TEC Temp (C)": "float", + "Crystal TEC Temp (C)": "float", + "Inlet Air Temp (C)": "float", + "Computer Heatsink Temp (C)": "float", + "Laser Heatsink Temp (C)": "float", + "Outlet Air Temp (C)": "float", + "YAG Output Monitor (V)": "float", + "Cavity Pressure (hPa)": "float", + "Laser Driver Power Monitor (uA)": "float", + "Laser Driver Current Limit Monitor (A)": "float", + "Laser Driver Current Monitor (A)": "float", + # ... (other HK columns can be added as needed) + } + + return {"pbp_canonical": pbp_canonical, "hk_canonical": hk_canonical} + + +def generate_combined_config( + pbp_file: str | Path, hk_file: str | Path, output_file: str = "config.yaml" +) -> None: + """Generate config file with both schema definitions and column mappings.""" config = { "pbp_schema": load_schema(pbp_file), "hk_schema": load_schema(hk_file), @@ -40,5 +101,128 @@ def generate_combined_config(pbp_file, hk_file, output_file="config.yaml"): print(f"Unified config saved to: {output_file}") +def generate_mapping_template( + pbp_file: str | Path, + hk_file: str | Path, + output_file: str = "config_with_mapping.yaml", +) -> None: + """ + Generate enhanced config with column mapping templates. + + This creates a config file that allows users to map their instrument-specific + column names to the canonical column names used in the main processing pipeline. + """ + # Load actual file schemas + pbp_schema = load_schema(pbp_file) + hk_schema = load_schema(hk_file) + + # Get canonical schemas + canonical_schemas = get_canonical_schemas() + + # Create column mapping templates + pbp_mapping = {} + hk_mapping = {} + + # For PbP: map file columns to canonical columns + for canonical_col in canonical_schemas["pbp_canonical"]: + # Try to find exact match first + matching_file_col = None + for file_col in pbp_schema.keys(): + if file_col.lower() == canonical_col.lower(): + matching_file_col = file_col + break + + # If exact match found, use it; otherwise leave as template + pbp_mapping[canonical_col] = ( + matching_file_col + or f"YOUR_COLUMN_NAME_FOR_{canonical_col.replace(' ', '_').replace('(', '').replace(')', '').upper()}" + ) + + # For HK: map file columns to canonical columns + for canonical_col in canonical_schemas["hk_canonical"]: + matching_file_col = None + for file_col in hk_schema.keys(): + if file_col.lower() == canonical_col.lower(): + matching_file_col = file_col + break + + hk_mapping[canonical_col] = ( + matching_file_col + or f"YOUR_COLUMN_NAME_FOR_{canonical_col.replace(' ', '_').replace('(', '').replace(')', '').upper()}" + ) + + # Build enhanced config + config = { + "# INSTRUCTIONS": [ + "This config file contains both schema definitions and column mappings.", + "1. The *_schema sections define the data types for your input files.", + "2. The *_column_mapping sections map your file columns to canonical names.", + "3. Replace placeholder values (YOUR_COLUMN_NAME_FOR_*) with actual column names from your files.", + "4. If your file doesn't have a particular canonical column, set it to null or remove the line.", + "5. The output parquet files will use the canonical column names for consistency.", + ], + "pbp_schema": pbp_schema, + "hk_schema": hk_schema, + "pbp_canonical_schema": canonical_schemas["pbp_canonical"], + "hk_canonical_schema": canonical_schemas["hk_canonical"], + "pbp_column_mapping": pbp_mapping, + "hk_column_mapping": hk_mapping, + } + + with open(output_file, "w") as f: + yaml.dump(config, f, sort_keys=False, default_flow_style=False) + + print(f"Enhanced config with column mapping saved to: {output_file}") + print("\nNext steps:") + print( + "1. Open the config file and replace placeholder column mappings with your actual column names" + ) + print( + "2. Remove or set to null any canonical columns that don't exist in your data" + ) + print("3. Use this config file with the updated CSV to Parquet conversion process") + + +def apply_column_mapping( + df: pd.DataFrame, column_mapping: dict[str, str | None] +) -> pd.DataFrame: + """ + Apply column name mapping to standardize column names. + + Parameters + ---------- + df : pd.DataFrame + Input dataframe with instrument-specific column names + column_mapping : dict[str, str | None] + Mapping from canonical names to file column names + + Returns + ------- + pd.DataFrame + DataFrame with standardized column names + """ + # Create reverse mapping: file_column_name -> canonical_name + reverse_mapping = {} + for canonical_name, file_column in column_mapping.items(): + if ( + file_column + and file_column in df.columns + and not file_column.startswith("YOUR_COLUMN_NAME_FOR_") + ): + reverse_mapping[file_column] = canonical_name + + # Rename columns using reverse mapping + df_renamed = df.rename(columns=reverse_mapping) + + return df_renamed + + # Example usage -generate_combined_config("pbp_meta.parquet", "hk_meta.parquet") +if __name__ == "__main__": + # Legacy function for backward compatibility + # generate_combined_config("pbp_meta.parquet", "hk_meta.parquet") + + # New enhanced function + pbp_tmp_file = "/data/user/bertoz_b/merlin6data/SP2XR_code/tests/data/mini_SP2XR_PbP_20190409110737_x0001.zip" + hk_tmp_file = "/data/user/bertoz_b/merlin6data/SP2XR_code/tests/data/mini_SP2XR_hk_20190409110737_x0001.zip" + generate_mapping_template(pbp_tmp_file, hk_tmp_file) diff --git a/src/sp2xr/io.py b/src/sp2xr/io.py index 4a82a15..1002b09 100644 --- a/src/sp2xr/io.py +++ b/src/sp2xr/io.py @@ -40,11 +40,27 @@ def csv_to_parquet( df.to_parquet(parquet_path, index=False) -def read_sp2xr_csv(file_path: str, schema: dict[str, Any]) -> dd.DataFrame: +def read_sp2xr_csv( + file_path: str, + schema: dict[str, Any], + column_mapping: dict[str, str | None] | None = None, +) -> dd.DataFrame: """ Read a SP2XR CSV/ZIP file with given schema from config. - Returns a Dask DataFrame. + Parameters + ---------- + file_path : str + Path to the CSV/ZIP file + schema : dict[str, Any] + Schema definition with column types + column_mapping : dict[str, str | None] | None + Optional mapping from canonical column names to file column names + + Returns + ------- + dd.DataFrame + Dask DataFrame with standardized column names """ datetime_cols = [col for col, typ in schema.items() if typ == "datetime"] dtype_cols = {col: typ for col, typ in schema.items() if typ != "datetime"} @@ -56,6 +72,27 @@ def read_sp2xr_csv(file_path: str, schema: dict[str, Any]) -> dd.DataFrame: blocksize=None, assume_missing=True, ) + + # Apply column mapping if provided + if column_mapping: + + def apply_column_mapping_partition(pdf: pd.DataFrame) -> pd.DataFrame: + """Apply column name mapping to standardize column names.""" + # Create reverse mapping: file_column_name -> canonical_name + reverse_mapping = {} + for canonical_name, file_column in column_mapping.items(): + if ( + file_column + and file_column in pdf.columns + and not file_column.startswith("YOUR_COLUMN_NAME_FOR_") + ): + reverse_mapping[file_column] = canonical_name + + # Rename columns using reverse mapping + return pdf.rename(columns=reverse_mapping) + + df = df.map_partitions(apply_column_mapping_partition, meta=df._meta) + return df @@ -161,7 +198,7 @@ def save_sp2xr_parquet(df, file_path, target_directory): ) -def process_sp2xr_file(file_path, config_path, target_directory): +def process_sp2xr_file(file_path: str, config_path: str, target_directory: str) -> None: """ This function reads Pbp or HK files from the SP2XR @@ -169,15 +206,16 @@ def process_sp2xr_file(file_path, config_path, target_directory): ---------- file_path : str Complete path of the file to read. - meta : pandas DataFrame - Empty pandas dataframe with the structure expected for the file that is read. - This is ised in case the file is empty --> The function will return an empty DataFrame - with this structure. + config_path : str + Path to the YAML configuration file containing schema definitions + and optional column mappings. + target_directory : str + Directory where the output parquet files will be saved. Returns ------- Dask DataFrame - Content of the file as Dask DataFrame. + Content of the file as Dask DataFrame with standardized column names. """ if not os.path.exists(file_path): @@ -187,14 +225,14 @@ def process_sp2xr_file(file_path, config_path, target_directory): raise ValueError("No config file found.") with open(config_path) as f: - schema = yaml.safe_load(f) + config = yaml.safe_load(f) tmp_hk = load_matching_hk_file(file_path) if tmp_hk.empty: warnings.warn("tmp_hk empty or not existing") df = pd.DataFrame() - # pbp_schema = schema["pbp_schema"] - # hk_schema = schema["hk_schema"] + return df + first_val, t0 = tmp_hk[["Time (sec)", "Time Stamp"]].values[0] def postprocess_hk(df): @@ -203,19 +241,22 @@ def process_sp2xr_file(file_path, config_path, target_directory): def postprocess_pbp(df): return df.fillna(0) + # Determine file type and get appropriate schema and mapping if "PbP" in file_path: - file_schema = schema["pbp_schema"] + file_schema = config["pbp_schema"] + column_mapping = config.get("pbp_column_mapping", None) postprocess = postprocess_pbp elif "hk" in file_path: - file_schema = schema["hk_schema"] + file_schema = config["hk_schema"] + column_mapping = config.get("hk_column_mapping", None) postprocess = postprocess_hk else: raise ValueError(f"Unrecognized file type: {file_path}") try: - df = read_sp2xr_csv(file_path, file_schema) + df = read_sp2xr_csv(file_path, file_schema, column_mapping) df = postprocess(df) except Exception as e: logger.error(f"[{file_path}] Unexpected error: {e}") diff --git a/tests/test_column_mapping.py b/tests/test_column_mapping.py new file mode 100644 index 0000000..0742372 --- /dev/null +++ b/tests/test_column_mapping.py @@ -0,0 +1,93 @@ +"""Tests for the column mapping functionality.""" + +from __future__ import annotations + +import pandas as pd +import sys +from pathlib import Path + +# Add meta_files to path +sys.path.insert(0, str(Path(__file__).parent.parent / "meta_files")) +from generate_config import apply_column_mapping, get_canonical_schemas + + +def test_apply_column_mapping(): + """Test that column mapping correctly renames columns.""" + # Create test DataFrame with instrument-specific column names + test_df = pd.DataFrame( + { + "Old_Time_Sec": [1.0, 2.0, 3.0], + "Old_Particle_Flags": [0, 1, 0], + "Other_Column": ["a", "b", "c"], + } + ) + + # Create mapping from canonical to file column names + test_mapping = { + "Time (sec)": "Old_Time_Sec", + "Particle Flags": "Old_Particle_Flags", + "Nonexistent Column": "YOUR_COLUMN_NAME_FOR_NONEXISTENT", + "Missing Column": None, + } + + result = apply_column_mapping(test_df, test_mapping) + + # Check that columns were renamed correctly + assert "Time (sec)" in result.columns + assert "Particle Flags" in result.columns + assert "Other_Column" in result.columns # Unmapped columns should remain + + # Check that placeholder and None mappings were ignored + assert "YOUR_COLUMN_NAME_FOR_NONEXISTENT" not in result.columns + assert "Nonexistent Column" not in result.columns + + # Check that data was preserved + assert result["Time (sec)"].tolist() == [1.0, 2.0, 3.0] + assert result["Particle Flags"].tolist() == [0, 1, 0] + + +def test_get_canonical_schemas(): + """Test that canonical schemas contain expected columns.""" + schemas = get_canonical_schemas() + + # Check that both PbP and HK canonical schemas exist + assert "pbp_canonical" in schemas + assert "hk_canonical" in schemas + + # Check for key columns that should be present + pbp_canonical = schemas["pbp_canonical"] + assert "Time (sec)" in pbp_canonical + assert "Particle Flags" in pbp_canonical + assert "Incand Mass (fg)" in pbp_canonical + + hk_canonical = schemas["hk_canonical"] + assert "Time Stamp" in hk_canonical + assert "Time (sec)" in hk_canonical + assert "Sample Flow Controller Read (sccm)" in hk_canonical + + +def test_column_mapping_empty_dataframe(): + """Test column mapping with empty DataFrame.""" + empty_df = pd.DataFrame() + mapping = {"Time (sec)": "Old_Time_Sec"} + + result = apply_column_mapping(empty_df, mapping) + assert result.empty + + +def test_column_mapping_no_mapping(): + """Test column mapping when no mapping is provided.""" + test_df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]}) + + result = apply_column_mapping(test_df, {}) + + # Should return original DataFrame unchanged + pd.testing.assert_frame_equal(result, test_df) + + +if __name__ == "__main__": + test_apply_column_mapping() + test_get_canonical_schemas() + test_column_mapping_empty_dataframe() + test_column_mapping_no_mapping() + print("All tests passed!")