Improve conversion from original csv or zip files to parquet with more robust schema definition
This commit is contained in:
60
How_to_convert_SP2XR_raw_data_to_parquet.md
Normal file
60
How_to_convert_SP2XR_raw_data_to_parquet.md
Normal file
@@ -0,0 +1,60 @@
|
||||
# Conversion of SP2-XR *PbP* and *HK* .csv/.zip files to .parquet
|
||||
|
||||
## Overview
|
||||
Different SP2-XR instrument versions may use different column names in their CSV/ZIP data files. To ensure consistent data processing, the SP2-XR package now supports automatic column name standardization during CSV to Parquet conversion.
|
||||
|
||||
## How It Works
|
||||
1. **Input**: Instrument-specific CSV/ZIP files with their original column names
|
||||
2. **Mapping**: A configuration file maps your column names to canonical (standard) column names
|
||||
3. **Output**: Parquet files with standardized column names for consistent downstream processing
|
||||
|
||||
## Usage
|
||||
|
||||
### Step 1: Generate Enhanced Config Template
|
||||
Use `generate_config.py` to create a configuration template:
|
||||
|
||||
```python
|
||||
from meta_files.generate_config import generate_mapping_template
|
||||
|
||||
# Generate config with column mappings
|
||||
generate_mapping_template("your_pbp_file.csv", "your_hk_file.csv", "config_with_mapping.yaml")
|
||||
```
|
||||
|
||||
### Step 2: Customize Column Mappings
|
||||
Open the generated `config_with_mapping.yaml` and update the column mappings:
|
||||
|
||||
```yaml
|
||||
pbp_column_mapping:
|
||||
# Canonical name -> Your file's column name
|
||||
Time (sec): "Time_Seconds" # Replace with your actual column name
|
||||
Particle Flags: "Flags" # Replace with your actual column name
|
||||
Incand Mass (fg): "Mass_fg" # Replace with your actual column name
|
||||
# ... etc
|
||||
|
||||
hk_column_mapping:
|
||||
Time Stamp: "Timestamp" # Replace with your actual column name
|
||||
Time (sec): "Time_Seconds" # Replace with your actual column name
|
||||
# ... etc
|
||||
```
|
||||
|
||||
### Step 3: Convert CSV to Parquet with Mapping
|
||||
Use your customized config with the CSV to Parquet conversion:
|
||||
|
||||
```bash
|
||||
python scripts/sp2xr_csv2parquet.py --source /path/to/csv --target /path/to/parquet --config config_with_mapping.yaml
|
||||
```
|
||||
|
||||
## Configuration File Structure
|
||||
|
||||
The enhanced config file contains several sections:
|
||||
|
||||
- **`pbp_schema`** / **`hk_schema`**: Data types for your input files
|
||||
- **`pbp_canonical_schema`** / **`hk_canonical_schema`**: Standard column schemas used by SP2-XR processing
|
||||
- **`pbp_column_mapping`** / **`hk_column_mapping`**: Maps canonical names to your file's column names
|
||||
|
||||
## Column Mapping Rules
|
||||
|
||||
1. **Exact matches**: If your column names exactly match canonical names, they're automatically mapped
|
||||
2. **Custom mapping**: Replace placeholder values with your actual column names
|
||||
3. **Missing columns**: Set to `null` or remove the line if your data doesn't have that column
|
||||
4. **Extra columns**: Unmapped columns in your files are preserved as-is
|
||||
252
meta_files/config_with_mapping.yaml
Normal file
252
meta_files/config_with_mapping.yaml
Normal file
@@ -0,0 +1,252 @@
|
||||
'# INSTRUCTIONS':
|
||||
- This config file contains both schema definitions and column mappings.
|
||||
- 1. The *_schema sections define the data types for your input files.
|
||||
- 2. The *_column_mapping sections map your file columns to canonical names.
|
||||
- 3. Replace placeholder values (YOUR_COLUMN_NAME_FOR_*) with actual column names
|
||||
from your files.
|
||||
- 4. If your file doesn't have a particular canonical column, set it to null or remove
|
||||
the line.
|
||||
- 5. The output parquet files will use the canonical column names for consistency.
|
||||
pbp_schema:
|
||||
Time (sec): float
|
||||
Packet Time Stamp: float
|
||||
Flag: int
|
||||
Dropped Records: int
|
||||
Record Count: int
|
||||
Record Size: int
|
||||
Particle Time Stamp: float
|
||||
Particle Flags: int
|
||||
Scatter relPeak: float
|
||||
Scatter Transit Time: int
|
||||
Scatter Peak Time: int
|
||||
Scatter FWHM: int
|
||||
Scatter Size (nm): float
|
||||
Incand relPeak: float
|
||||
Incand Transit Time: float
|
||||
Incand Peak Time: float
|
||||
Incand FWHM: float
|
||||
Incand Delay: float
|
||||
Incand Mass (fg): float
|
||||
Reserved: int
|
||||
hk_schema:
|
||||
Time Stamp: string
|
||||
Time (sec): float
|
||||
Time Stamp (UTC sec): float
|
||||
Elapsed Time: float
|
||||
Error Code: int
|
||||
Packet Time Stamp: float
|
||||
Laser TEC Temp (C): float
|
||||
Crystal TEC Temp (C): int
|
||||
Inlet Air Temp (C): float
|
||||
Computer Heatsink Temp (C): float
|
||||
Laser Heatsink Temp (C): float
|
||||
Outlet Air Temp (C): float
|
||||
YAG Output Monitor (V): float
|
||||
Cavity Pressure (hPa): float
|
||||
Laser Driver Power Monitor (uA): int
|
||||
Laser Driver Current Limit Monitor (A): float
|
||||
Laser Driver Current Monitor (A): float
|
||||
Laser TEC Sense: float
|
||||
Laser Over Temp (On/Off): int
|
||||
+5V Laser Rail (V): float
|
||||
' +5V Rail (V)': float
|
||||
+12V Rail (V): float
|
||||
High Voltage (V): float
|
||||
Battery Temp (C): float
|
||||
UPS Output (V): float
|
||||
12V Iso Rail (V): float
|
||||
5V Iso Rail (V): float
|
||||
3.3V Iso Rail (V): float
|
||||
Spare 22: int
|
||||
Spare 23: int
|
||||
408 Board Spare 0: int
|
||||
408 Board Spare 1: int
|
||||
408 Board Spare 2: int
|
||||
408 Board Spare 3: int
|
||||
408 Board Spare 4: int
|
||||
Purge Flow Monitor (sccm): float
|
||||
System Input Voltage (V): float
|
||||
Board Temperature (C): float
|
||||
408 Board Spare 8: int
|
||||
408 Board Spare 9: int
|
||||
408 Board Spare 10: int
|
||||
408 Board Spare 11: int
|
||||
408 Board Spare 12: int
|
||||
408 Board Spare 13: int
|
||||
408 Board Spare 14: int
|
||||
408 Board Spare 15: int
|
||||
Sheath Flow Controller Read (vccm): int
|
||||
Sheath Flow Controller Read (sccm): int
|
||||
Sheath Flow Controller Pressure (psia): float
|
||||
Sheath Flow Controller Temperature (C): float
|
||||
Sample Flow Controller Read (vccm): float
|
||||
Sample Flow Controller Read (sccm): float
|
||||
Sample Flow Controller Pressure (psia): float
|
||||
Sample Flow Controller Temperature (C): float
|
||||
Fan 1 (RPM): int
|
||||
Fan 2 (RPM): int
|
||||
Laser Fan (RPM): int
|
||||
Spare tach: int
|
||||
Threshold Crossing Events: int
|
||||
Dual Qualified Scatter and Incand Particles: int
|
||||
Qualified Scatter Only Particles: int
|
||||
Qualified Incand Only Particles: int
|
||||
Disqualified Due to Scatter Saturation: int
|
||||
Disqualified Due to Scatter Transit Time Min: int
|
||||
Disqualified Due to Scatter Transit Time Max: int
|
||||
Disqualified Due to Scatter FWHM Min: int
|
||||
Disqualified Due to Scatter FWHM Max: int
|
||||
Scatter Inter Part Period Min Violation: int
|
||||
Disqualified Due to Incand Saturation: int
|
||||
Disqualified Due to Incand Transit Time Min: int
|
||||
Disqualified Due to Incand Transit Time Max: int
|
||||
Disqualified Due to Incand FWHM Min: int
|
||||
Disqualified Due to Incand FWHM Max: int
|
||||
Incand Inter Part Period Min Violation: int
|
||||
Baseline Sizer Lo: int
|
||||
Baseline Sizer Hi: int
|
||||
Baseline Incand Lo: int
|
||||
Baseline Incand Hi: int
|
||||
Bandwidth Sizer Hi: int
|
||||
Bandwidth Sizer Lo: int
|
||||
Bandwidth Incand Lo: int
|
||||
Bandwidth Incand Hi: int
|
||||
ABD-0408 HK ADCs min: int
|
||||
ABD-0436 HK ADCs min: int
|
||||
ABD-0408 HK ADCs max: int
|
||||
ABD-0436 HK ADCs max: int
|
||||
Incand Particle Conc (cts/ccm): float
|
||||
Scattering Particle Conc (cts/ccm): float
|
||||
Incand Mass Conc (fg/sccm): float
|
||||
Scattering Mass Conc (fg/sccm): float
|
||||
Sheath Flow Set Point: int
|
||||
Sample Flow Set Point: int
|
||||
Laser Temp Set Point: int
|
||||
Laser Current Set Point: float
|
||||
Spare 4 Set Point: int
|
||||
Spare 5 Set Point: int
|
||||
PMT HV Set Point: float
|
||||
Particle Density (g/ccm): float
|
||||
PbP Packet Time: float
|
||||
Scatter Bin 1: int
|
||||
Scatter Bin 2: int
|
||||
Scatter Bin 3: int
|
||||
Scatter Bin 4: int
|
||||
Scatter Bin 5: int
|
||||
Scatter Bin 6: int
|
||||
Scatter Bin 7: int
|
||||
Scatter Bin 8: int
|
||||
Scatter Bin 9: int
|
||||
Scatter Bin 10: int
|
||||
Scatter Bin 11: int
|
||||
Scatter Bin 12: int
|
||||
Scatter Bin 13: int
|
||||
Scatter Bin 14: int
|
||||
Scatter Bin 15: int
|
||||
Scatter Bin 16: int
|
||||
Scatter Bin 17: int
|
||||
Scatter Bin 18: int
|
||||
Scatter Bin 19: int
|
||||
Scatter Bin 20: int
|
||||
Incand Bin 1: int
|
||||
Incand Bin 2: int
|
||||
Incand Bin 3: int
|
||||
Incand Bin 4: int
|
||||
Incand Bin 5: int
|
||||
Incand Bin 6: int
|
||||
Incand Bin 7: int
|
||||
Incand Bin 8: int
|
||||
Incand Bin 9: int
|
||||
Incand Bin 10: int
|
||||
Incand Bin 11: int
|
||||
Incand Bin 12: int
|
||||
Incand Bin 13: int
|
||||
Incand Bin 14: int
|
||||
Incand Bin 15: int
|
||||
Incand Bin 16: int
|
||||
Incand Bin 17: int
|
||||
Incand Bin 18: int
|
||||
Incand Bin 19: int
|
||||
Incand Bin 20: int
|
||||
pbp_canonical_schema:
|
||||
Time (sec): float
|
||||
Packet Time Stamp: float
|
||||
Flag: float
|
||||
Dropped Records: float
|
||||
Record Count: float
|
||||
Record Size: float
|
||||
Particle Time Stamp: float
|
||||
Particle Flags: float
|
||||
Scatter relPeak: float
|
||||
Scatter Transit Time: float
|
||||
Scatter Peak Time: float
|
||||
Scatter FWHM: float
|
||||
Scatter Size (nm): float
|
||||
Incand relPeak: float
|
||||
Incand Transit Time: float
|
||||
Incand Peak Time: float
|
||||
Incand FWHM: float
|
||||
Incand Delay: float
|
||||
Incand Mass (fg): float
|
||||
Reserved: float
|
||||
hk_canonical_schema:
|
||||
Time Stamp: datetime
|
||||
Time (sec): float
|
||||
Sample Flow Controller Read (sccm): float
|
||||
Sample Flow Controller Read (vccm): float
|
||||
Time Stamp (UTC sec): float
|
||||
Elapsed Time: float
|
||||
Error Code: float
|
||||
Packet Time Stamp: float
|
||||
Laser TEC Temp (C): float
|
||||
Crystal TEC Temp (C): float
|
||||
Inlet Air Temp (C): float
|
||||
Computer Heatsink Temp (C): float
|
||||
Laser Heatsink Temp (C): float
|
||||
Outlet Air Temp (C): float
|
||||
YAG Output Monitor (V): float
|
||||
Cavity Pressure (hPa): float
|
||||
Laser Driver Power Monitor (uA): float
|
||||
Laser Driver Current Limit Monitor (A): float
|
||||
Laser Driver Current Monitor (A): float
|
||||
pbp_column_mapping:
|
||||
Time (sec): Time (sec)
|
||||
Packet Time Stamp: Packet Time Stamp
|
||||
Flag: Flag
|
||||
Dropped Records: Dropped Records
|
||||
Record Count: Record Count
|
||||
Record Size: Record Size
|
||||
Particle Time Stamp: Particle Time Stamp
|
||||
Particle Flags: Particle Flags
|
||||
Scatter relPeak: Scatter relPeak
|
||||
Scatter Transit Time: Scatter Transit Time
|
||||
Scatter Peak Time: Scatter Peak Time
|
||||
Scatter FWHM: Scatter FWHM
|
||||
Scatter Size (nm): Scatter Size (nm)
|
||||
Incand relPeak: Incand relPeak
|
||||
Incand Transit Time: Incand Transit Time
|
||||
Incand Peak Time: Incand Peak Time
|
||||
Incand FWHM: Incand FWHM
|
||||
Incand Delay: Incand Delay
|
||||
Incand Mass (fg): Incand Mass (fg)
|
||||
Reserved: Reserved
|
||||
hk_column_mapping:
|
||||
Time Stamp: Time Stamp
|
||||
Time (sec): Time (sec)
|
||||
Sample Flow Controller Read (sccm): Sample Flow Controller Read (sccm)
|
||||
Sample Flow Controller Read (vccm): Sample Flow Controller Read (vccm)
|
||||
Time Stamp (UTC sec): Time Stamp (UTC sec)
|
||||
Elapsed Time: Elapsed Time
|
||||
Error Code: Error Code
|
||||
Packet Time Stamp: Packet Time Stamp
|
||||
Laser TEC Temp (C): Laser TEC Temp (C)
|
||||
Crystal TEC Temp (C): Crystal TEC Temp (C)
|
||||
Inlet Air Temp (C): Inlet Air Temp (C)
|
||||
Computer Heatsink Temp (C): Computer Heatsink Temp (C)
|
||||
Laser Heatsink Temp (C): Laser Heatsink Temp (C)
|
||||
Outlet Air Temp (C): Outlet Air Temp (C)
|
||||
YAG Output Monitor (V): YAG Output Monitor (V)
|
||||
Cavity Pressure (hPa): Cavity Pressure (hPa)
|
||||
Laser Driver Power Monitor (uA): Laser Driver Power Monitor (uA)
|
||||
Laser Driver Current Limit Monitor (A): Laser Driver Current Limit Monitor (A)
|
||||
Laser Driver Current Monitor (A): Laser Driver Current Monitor (A)
|
||||
@@ -1,9 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pandas as pd
|
||||
import yaml
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
def infer_general_dtype(dtype):
|
||||
def infer_general_dtype(dtype: Any) -> str:
|
||||
"""Infer general data type from pandas dtype."""
|
||||
if pd.api.types.is_integer_dtype(dtype):
|
||||
return "int"
|
||||
elif pd.api.types.is_float_dtype(dtype):
|
||||
@@ -14,10 +19,11 @@ def infer_general_dtype(dtype):
|
||||
return "string"
|
||||
|
||||
|
||||
def load_schema(input_file):
|
||||
ext = os.path.splitext(input_file)[1].lower()
|
||||
def load_schema(input_file: str | Path) -> dict[str, str]:
|
||||
"""Load schema from input file by inferring column types."""
|
||||
ext = os.path.splitext(str(input_file))[1].lower()
|
||||
|
||||
if ext == ".csv":
|
||||
if ext in [".csv", ".zip"]:
|
||||
df = pd.read_csv(input_file, nrows=100)
|
||||
elif ext == ".parquet":
|
||||
df = pd.read_parquet(input_file)
|
||||
@@ -28,7 +34,62 @@ def load_schema(input_file):
|
||||
return schema
|
||||
|
||||
|
||||
def generate_combined_config(pbp_file, hk_file, output_file="config.yaml"):
|
||||
def get_canonical_schemas() -> dict[str, dict[str, str]]:
|
||||
"""Return canonical column schemas for SP2XR data."""
|
||||
pbp_canonical = {
|
||||
"Time (sec)": "float",
|
||||
"Packet Time Stamp": "float",
|
||||
"Flag": "float",
|
||||
"Dropped Records": "float",
|
||||
"Record Count": "float",
|
||||
"Record Size": "float",
|
||||
"Particle Time Stamp": "float",
|
||||
"Particle Flags": "float",
|
||||
"Scatter relPeak": "float",
|
||||
"Scatter Transit Time": "float",
|
||||
"Scatter Peak Time": "float",
|
||||
"Scatter FWHM": "float",
|
||||
"Scatter Size (nm)": "float",
|
||||
"Incand relPeak": "float",
|
||||
"Incand Transit Time": "float",
|
||||
"Incand Peak Time": "float",
|
||||
"Incand FWHM": "float",
|
||||
"Incand Delay": "float",
|
||||
"Incand Mass (fg)": "float",
|
||||
"Reserved": "float",
|
||||
}
|
||||
|
||||
hk_canonical = {
|
||||
"Time Stamp": "datetime",
|
||||
"Time (sec)": "float",
|
||||
"Sample Flow Controller Read (sccm)": "float",
|
||||
"Sample Flow Controller Read (vccm)": "float",
|
||||
# Core HK columns that are commonly used
|
||||
"Time Stamp (UTC sec)": "float",
|
||||
"Elapsed Time": "float",
|
||||
"Error Code": "float",
|
||||
"Packet Time Stamp": "float",
|
||||
"Laser TEC Temp (C)": "float",
|
||||
"Crystal TEC Temp (C)": "float",
|
||||
"Inlet Air Temp (C)": "float",
|
||||
"Computer Heatsink Temp (C)": "float",
|
||||
"Laser Heatsink Temp (C)": "float",
|
||||
"Outlet Air Temp (C)": "float",
|
||||
"YAG Output Monitor (V)": "float",
|
||||
"Cavity Pressure (hPa)": "float",
|
||||
"Laser Driver Power Monitor (uA)": "float",
|
||||
"Laser Driver Current Limit Monitor (A)": "float",
|
||||
"Laser Driver Current Monitor (A)": "float",
|
||||
# ... (other HK columns can be added as needed)
|
||||
}
|
||||
|
||||
return {"pbp_canonical": pbp_canonical, "hk_canonical": hk_canonical}
|
||||
|
||||
|
||||
def generate_combined_config(
|
||||
pbp_file: str | Path, hk_file: str | Path, output_file: str = "config.yaml"
|
||||
) -> None:
|
||||
"""Generate config file with both schema definitions and column mappings."""
|
||||
config = {
|
||||
"pbp_schema": load_schema(pbp_file),
|
||||
"hk_schema": load_schema(hk_file),
|
||||
@@ -40,5 +101,128 @@ def generate_combined_config(pbp_file, hk_file, output_file="config.yaml"):
|
||||
print(f"Unified config saved to: {output_file}")
|
||||
|
||||
|
||||
def generate_mapping_template(
|
||||
pbp_file: str | Path,
|
||||
hk_file: str | Path,
|
||||
output_file: str = "config_with_mapping.yaml",
|
||||
) -> None:
|
||||
"""
|
||||
Generate enhanced config with column mapping templates.
|
||||
|
||||
This creates a config file that allows users to map their instrument-specific
|
||||
column names to the canonical column names used in the main processing pipeline.
|
||||
"""
|
||||
# Load actual file schemas
|
||||
pbp_schema = load_schema(pbp_file)
|
||||
hk_schema = load_schema(hk_file)
|
||||
|
||||
# Get canonical schemas
|
||||
canonical_schemas = get_canonical_schemas()
|
||||
|
||||
# Create column mapping templates
|
||||
pbp_mapping = {}
|
||||
hk_mapping = {}
|
||||
|
||||
# For PbP: map file columns to canonical columns
|
||||
for canonical_col in canonical_schemas["pbp_canonical"]:
|
||||
# Try to find exact match first
|
||||
matching_file_col = None
|
||||
for file_col in pbp_schema.keys():
|
||||
if file_col.lower() == canonical_col.lower():
|
||||
matching_file_col = file_col
|
||||
break
|
||||
|
||||
# If exact match found, use it; otherwise leave as template
|
||||
pbp_mapping[canonical_col] = (
|
||||
matching_file_col
|
||||
or f"YOUR_COLUMN_NAME_FOR_{canonical_col.replace(' ', '_').replace('(', '').replace(')', '').upper()}"
|
||||
)
|
||||
|
||||
# For HK: map file columns to canonical columns
|
||||
for canonical_col in canonical_schemas["hk_canonical"]:
|
||||
matching_file_col = None
|
||||
for file_col in hk_schema.keys():
|
||||
if file_col.lower() == canonical_col.lower():
|
||||
matching_file_col = file_col
|
||||
break
|
||||
|
||||
hk_mapping[canonical_col] = (
|
||||
matching_file_col
|
||||
or f"YOUR_COLUMN_NAME_FOR_{canonical_col.replace(' ', '_').replace('(', '').replace(')', '').upper()}"
|
||||
)
|
||||
|
||||
# Build enhanced config
|
||||
config = {
|
||||
"# INSTRUCTIONS": [
|
||||
"This config file contains both schema definitions and column mappings.",
|
||||
"1. The *_schema sections define the data types for your input files.",
|
||||
"2. The *_column_mapping sections map your file columns to canonical names.",
|
||||
"3. Replace placeholder values (YOUR_COLUMN_NAME_FOR_*) with actual column names from your files.",
|
||||
"4. If your file doesn't have a particular canonical column, set it to null or remove the line.",
|
||||
"5. The output parquet files will use the canonical column names for consistency.",
|
||||
],
|
||||
"pbp_schema": pbp_schema,
|
||||
"hk_schema": hk_schema,
|
||||
"pbp_canonical_schema": canonical_schemas["pbp_canonical"],
|
||||
"hk_canonical_schema": canonical_schemas["hk_canonical"],
|
||||
"pbp_column_mapping": pbp_mapping,
|
||||
"hk_column_mapping": hk_mapping,
|
||||
}
|
||||
|
||||
with open(output_file, "w") as f:
|
||||
yaml.dump(config, f, sort_keys=False, default_flow_style=False)
|
||||
|
||||
print(f"Enhanced config with column mapping saved to: {output_file}")
|
||||
print("\nNext steps:")
|
||||
print(
|
||||
"1. Open the config file and replace placeholder column mappings with your actual column names"
|
||||
)
|
||||
print(
|
||||
"2. Remove or set to null any canonical columns that don't exist in your data"
|
||||
)
|
||||
print("3. Use this config file with the updated CSV to Parquet conversion process")
|
||||
|
||||
|
||||
def apply_column_mapping(
|
||||
df: pd.DataFrame, column_mapping: dict[str, str | None]
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Apply column name mapping to standardize column names.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
df : pd.DataFrame
|
||||
Input dataframe with instrument-specific column names
|
||||
column_mapping : dict[str, str | None]
|
||||
Mapping from canonical names to file column names
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
DataFrame with standardized column names
|
||||
"""
|
||||
# Create reverse mapping: file_column_name -> canonical_name
|
||||
reverse_mapping = {}
|
||||
for canonical_name, file_column in column_mapping.items():
|
||||
if (
|
||||
file_column
|
||||
and file_column in df.columns
|
||||
and not file_column.startswith("YOUR_COLUMN_NAME_FOR_")
|
||||
):
|
||||
reverse_mapping[file_column] = canonical_name
|
||||
|
||||
# Rename columns using reverse mapping
|
||||
df_renamed = df.rename(columns=reverse_mapping)
|
||||
|
||||
return df_renamed
|
||||
|
||||
|
||||
# Example usage
|
||||
generate_combined_config("pbp_meta.parquet", "hk_meta.parquet")
|
||||
if __name__ == "__main__":
|
||||
# Legacy function for backward compatibility
|
||||
# generate_combined_config("pbp_meta.parquet", "hk_meta.parquet")
|
||||
|
||||
# New enhanced function
|
||||
pbp_tmp_file = "/data/user/bertoz_b/merlin6data/SP2XR_code/tests/data/mini_SP2XR_PbP_20190409110737_x0001.zip"
|
||||
hk_tmp_file = "/data/user/bertoz_b/merlin6data/SP2XR_code/tests/data/mini_SP2XR_hk_20190409110737_x0001.zip"
|
||||
generate_mapping_template(pbp_tmp_file, hk_tmp_file)
|
||||
|
||||
@@ -40,11 +40,27 @@ def csv_to_parquet(
|
||||
df.to_parquet(parquet_path, index=False)
|
||||
|
||||
|
||||
def read_sp2xr_csv(file_path: str, schema: dict[str, Any]) -> dd.DataFrame:
|
||||
def read_sp2xr_csv(
|
||||
file_path: str,
|
||||
schema: dict[str, Any],
|
||||
column_mapping: dict[str, str | None] | None = None,
|
||||
) -> dd.DataFrame:
|
||||
"""
|
||||
Read a SP2XR CSV/ZIP file with given schema from config.
|
||||
|
||||
Returns a Dask DataFrame.
|
||||
Parameters
|
||||
----------
|
||||
file_path : str
|
||||
Path to the CSV/ZIP file
|
||||
schema : dict[str, Any]
|
||||
Schema definition with column types
|
||||
column_mapping : dict[str, str | None] | None
|
||||
Optional mapping from canonical column names to file column names
|
||||
|
||||
Returns
|
||||
-------
|
||||
dd.DataFrame
|
||||
Dask DataFrame with standardized column names
|
||||
"""
|
||||
datetime_cols = [col for col, typ in schema.items() if typ == "datetime"]
|
||||
dtype_cols = {col: typ for col, typ in schema.items() if typ != "datetime"}
|
||||
@@ -56,6 +72,27 @@ def read_sp2xr_csv(file_path: str, schema: dict[str, Any]) -> dd.DataFrame:
|
||||
blocksize=None,
|
||||
assume_missing=True,
|
||||
)
|
||||
|
||||
# Apply column mapping if provided
|
||||
if column_mapping:
|
||||
|
||||
def apply_column_mapping_partition(pdf: pd.DataFrame) -> pd.DataFrame:
|
||||
"""Apply column name mapping to standardize column names."""
|
||||
# Create reverse mapping: file_column_name -> canonical_name
|
||||
reverse_mapping = {}
|
||||
for canonical_name, file_column in column_mapping.items():
|
||||
if (
|
||||
file_column
|
||||
and file_column in pdf.columns
|
||||
and not file_column.startswith("YOUR_COLUMN_NAME_FOR_")
|
||||
):
|
||||
reverse_mapping[file_column] = canonical_name
|
||||
|
||||
# Rename columns using reverse mapping
|
||||
return pdf.rename(columns=reverse_mapping)
|
||||
|
||||
df = df.map_partitions(apply_column_mapping_partition, meta=df._meta)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
@@ -161,7 +198,7 @@ def save_sp2xr_parquet(df, file_path, target_directory):
|
||||
)
|
||||
|
||||
|
||||
def process_sp2xr_file(file_path, config_path, target_directory):
|
||||
def process_sp2xr_file(file_path: str, config_path: str, target_directory: str) -> None:
|
||||
"""
|
||||
This function reads Pbp or HK files from the SP2XR
|
||||
|
||||
@@ -169,15 +206,16 @@ def process_sp2xr_file(file_path, config_path, target_directory):
|
||||
----------
|
||||
file_path : str
|
||||
Complete path of the file to read.
|
||||
meta : pandas DataFrame
|
||||
Empty pandas dataframe with the structure expected for the file that is read.
|
||||
This is ised in case the file is empty --> The function will return an empty DataFrame
|
||||
with this structure.
|
||||
config_path : str
|
||||
Path to the YAML configuration file containing schema definitions
|
||||
and optional column mappings.
|
||||
target_directory : str
|
||||
Directory where the output parquet files will be saved.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dask DataFrame
|
||||
Content of the file as Dask DataFrame.
|
||||
Content of the file as Dask DataFrame with standardized column names.
|
||||
|
||||
"""
|
||||
if not os.path.exists(file_path):
|
||||
@@ -187,14 +225,14 @@ def process_sp2xr_file(file_path, config_path, target_directory):
|
||||
raise ValueError("No config file found.")
|
||||
|
||||
with open(config_path) as f:
|
||||
schema = yaml.safe_load(f)
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
tmp_hk = load_matching_hk_file(file_path)
|
||||
if tmp_hk.empty:
|
||||
warnings.warn("tmp_hk empty or not existing")
|
||||
df = pd.DataFrame()
|
||||
# pbp_schema = schema["pbp_schema"]
|
||||
# hk_schema = schema["hk_schema"]
|
||||
return df
|
||||
|
||||
first_val, t0 = tmp_hk[["Time (sec)", "Time Stamp"]].values[0]
|
||||
|
||||
def postprocess_hk(df):
|
||||
@@ -203,19 +241,22 @@ def process_sp2xr_file(file_path, config_path, target_directory):
|
||||
def postprocess_pbp(df):
|
||||
return df.fillna(0)
|
||||
|
||||
# Determine file type and get appropriate schema and mapping
|
||||
if "PbP" in file_path:
|
||||
file_schema = schema["pbp_schema"]
|
||||
file_schema = config["pbp_schema"]
|
||||
column_mapping = config.get("pbp_column_mapping", None)
|
||||
postprocess = postprocess_pbp
|
||||
|
||||
elif "hk" in file_path:
|
||||
file_schema = schema["hk_schema"]
|
||||
file_schema = config["hk_schema"]
|
||||
column_mapping = config.get("hk_column_mapping", None)
|
||||
postprocess = postprocess_hk
|
||||
|
||||
else:
|
||||
raise ValueError(f"Unrecognized file type: {file_path}")
|
||||
|
||||
try:
|
||||
df = read_sp2xr_csv(file_path, file_schema)
|
||||
df = read_sp2xr_csv(file_path, file_schema, column_mapping)
|
||||
df = postprocess(df)
|
||||
except Exception as e:
|
||||
logger.error(f"[{file_path}] Unexpected error: {e}")
|
||||
|
||||
93
tests/test_column_mapping.py
Normal file
93
tests/test_column_mapping.py
Normal file
@@ -0,0 +1,93 @@
|
||||
"""Tests for the column mapping functionality."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pandas as pd
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add meta_files to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "meta_files"))
|
||||
from generate_config import apply_column_mapping, get_canonical_schemas
|
||||
|
||||
|
||||
def test_apply_column_mapping():
|
||||
"""Test that column mapping correctly renames columns."""
|
||||
# Create test DataFrame with instrument-specific column names
|
||||
test_df = pd.DataFrame(
|
||||
{
|
||||
"Old_Time_Sec": [1.0, 2.0, 3.0],
|
||||
"Old_Particle_Flags": [0, 1, 0],
|
||||
"Other_Column": ["a", "b", "c"],
|
||||
}
|
||||
)
|
||||
|
||||
# Create mapping from canonical to file column names
|
||||
test_mapping = {
|
||||
"Time (sec)": "Old_Time_Sec",
|
||||
"Particle Flags": "Old_Particle_Flags",
|
||||
"Nonexistent Column": "YOUR_COLUMN_NAME_FOR_NONEXISTENT",
|
||||
"Missing Column": None,
|
||||
}
|
||||
|
||||
result = apply_column_mapping(test_df, test_mapping)
|
||||
|
||||
# Check that columns were renamed correctly
|
||||
assert "Time (sec)" in result.columns
|
||||
assert "Particle Flags" in result.columns
|
||||
assert "Other_Column" in result.columns # Unmapped columns should remain
|
||||
|
||||
# Check that placeholder and None mappings were ignored
|
||||
assert "YOUR_COLUMN_NAME_FOR_NONEXISTENT" not in result.columns
|
||||
assert "Nonexistent Column" not in result.columns
|
||||
|
||||
# Check that data was preserved
|
||||
assert result["Time (sec)"].tolist() == [1.0, 2.0, 3.0]
|
||||
assert result["Particle Flags"].tolist() == [0, 1, 0]
|
||||
|
||||
|
||||
def test_get_canonical_schemas():
|
||||
"""Test that canonical schemas contain expected columns."""
|
||||
schemas = get_canonical_schemas()
|
||||
|
||||
# Check that both PbP and HK canonical schemas exist
|
||||
assert "pbp_canonical" in schemas
|
||||
assert "hk_canonical" in schemas
|
||||
|
||||
# Check for key columns that should be present
|
||||
pbp_canonical = schemas["pbp_canonical"]
|
||||
assert "Time (sec)" in pbp_canonical
|
||||
assert "Particle Flags" in pbp_canonical
|
||||
assert "Incand Mass (fg)" in pbp_canonical
|
||||
|
||||
hk_canonical = schemas["hk_canonical"]
|
||||
assert "Time Stamp" in hk_canonical
|
||||
assert "Time (sec)" in hk_canonical
|
||||
assert "Sample Flow Controller Read (sccm)" in hk_canonical
|
||||
|
||||
|
||||
def test_column_mapping_empty_dataframe():
|
||||
"""Test column mapping with empty DataFrame."""
|
||||
empty_df = pd.DataFrame()
|
||||
mapping = {"Time (sec)": "Old_Time_Sec"}
|
||||
|
||||
result = apply_column_mapping(empty_df, mapping)
|
||||
assert result.empty
|
||||
|
||||
|
||||
def test_column_mapping_no_mapping():
|
||||
"""Test column mapping when no mapping is provided."""
|
||||
test_df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
|
||||
|
||||
result = apply_column_mapping(test_df, {})
|
||||
|
||||
# Should return original DataFrame unchanged
|
||||
pd.testing.assert_frame_equal(result, test_df)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_apply_column_mapping()
|
||||
test_get_canonical_schemas()
|
||||
test_column_mapping_empty_dataframe()
|
||||
test_column_mapping_no_mapping()
|
||||
print("All tests passed!")
|
||||
Reference in New Issue
Block a user