#!/usr/bin/env python3 """ Generate SP2XR configuration files by automatically detecting PbP and HK files in a directory. This script scans a directory for SP2XR data files (CSV/ZIP/Parquet) and generates configuration files with proper schemas and column mappings. """ from __future__ import annotations import argparse import pandas as pd import yaml import os from pathlib import Path from typing import Any def infer_general_dtype(dtype: Any) -> str: """Infer general data type from pandas dtype.""" if pd.api.types.is_integer_dtype(dtype): return "float" # Use float to handle potential NA values elif pd.api.types.is_float_dtype(dtype): return "float" elif pd.api.types.is_datetime64_any_dtype(dtype): return "datetime" else: return "string" def find_sp2xr_files(directory: str | Path) -> tuple[list[Path], list[Path]]: """ Find PbP and HK files in the given directory and all subdirectories. Parameters ---------- directory : str | Path Directory to search for SP2XR files (searches recursively) Returns ------- tuple[list[Path], list[Path]] Lists of PbP files and HK files found """ directory = Path(directory) if not directory.exists(): raise FileNotFoundError(f"Directory not found: {directory}") # Common SP2XR file patterns pbp_patterns = ["*PbP*", "*pbp*", "*Pbp*"] hk_patterns = ["*hk*", "*HK*", "*Hk*"] file_extensions = ["*.csv", "*.zip", "*.parquet"] pbp_files = [] hk_files = [] # Search for files matching patterns (including subdirectories) for ext in file_extensions: for pattern in pbp_patterns: pbp_files.extend(directory.glob(f"**/{pattern}{ext}")) for pattern in hk_patterns: hk_files.extend(directory.glob(f"**/{pattern}{ext}")) # Remove duplicates and sort pbp_files = sorted(list(set(pbp_files))) hk_files = sorted(list(set(hk_files))) return pbp_files, hk_files def load_schema(input_file: str | Path, nrows: int = 100) -> dict[str, str]: """ Load schema from input file by inferring column types. Parameters ---------- input_file : str | Path Path to the input file nrows : int Number of rows to read for type inference (for CSV files) Returns ------- dict[str, str] Mapping of column names to data types """ ext = os.path.splitext(str(input_file))[1].lower() if ext in [".csv", ".zip"]: df = pd.read_csv(input_file, nrows=nrows) elif ext == ".parquet": # For parquet, we can just read the schema without loading data pf = pd.read_parquet(input_file, engine="pyarrow") df = pf.head(0) # Empty dataframe with schema else: raise ValueError(f"Unsupported file format: {ext}") schema = {col: infer_general_dtype(dtype) for col, dtype in df.dtypes.items()} return schema def get_canonical_schemas() -> dict[str, dict[str, str]]: """Return canonical column schemas for SP2XR data.""" pbp_canonical = { "Time (sec)": "float", "Packet Time Stamp": "float", "Flag": "float", "Dropped Records": "float", "Record Count": "float", "Record Size": "float", "Particle Time Stamp": "float", "Particle Flags": "float", "Scatter relPeak": "float", "Scatter Transit Time": "float", "Scatter Peak Time": "float", "Scatter FWHM": "float", "Scatter Size (nm)": "float", "Incand relPeak": "float", "Incand Transit Time": "float", "Incand Peak Time": "float", "Incand FWHM": "float", "Incand Delay": "float", "Incand Mass (fg)": "float", "Reserved": "float", } hk_canonical = { "Time Stamp": "datetime", "Time (sec)": "float", "Sample Flow Controller Read (sccm)": "float", "Sample Flow Controller Read (vccm)": "float", "Time Stamp (UTC sec)": "float", "Elapsed Time": "float", "Error Code": "float", "Packet Time Stamp": "float", "Laser TEC Temp (C)": "float", "Crystal TEC Temp (C)": "float", "Inlet Air Temp (C)": "float", "Computer Heatsink Temp (C)": "float", "Laser Heatsink Temp (C)": "float", "Outlet Air Temp (C)": "float", "YAG Output Monitor (V)": "float", "Cavity Pressure (hPa)": "float", "Laser Driver Power Monitor (uA)": "float", "Laser Driver Current Limit Monitor (A)": "float", "Laser Driver Current Monitor (A)": "float", } return {"pbp_canonical": pbp_canonical, "hk_canonical": hk_canonical} def generate_basic_config( pbp_file: Path, hk_file: Path, schema_output: str = "new_data_schema.yaml", ini_file: str = None, instrument_output: str = None, ) -> None: """Generate basic config schema file with data type definitions only.""" print(f"Reading PbP schema from: {pbp_file}") pbp_schema = load_schema(pbp_file) print(f"Reading HK schema from: {hk_file}") hk_schema = load_schema(hk_file) config = { "pbp_schema": pbp_schema, "hk_schema": hk_schema, } # Create output directory if it doesn't exist schema_path = Path(schema_output) if schema_path.parent != Path("."): schema_path.parent.mkdir(parents=True, exist_ok=True) print(f"Created config directory: {schema_path.parent}") with open(schema_output, "w") as f: yaml.dump(config, f, sort_keys=False) print(f"Data schema config saved to: {schema_path.absolute()}") # Generate separate instrument settings config from INI file if ini_file: # Determine instrument settings output filename if instrument_output: instrument_path = Path(instrument_output) else: instrument_path = schema_path.parent / "new_instrument_settings.yaml" try: from sp2xr.helpers import export_xr_ini_to_yaml_with_source export_xr_ini_to_yaml_with_source(ini_file, str(instrument_path)) # print(f"Instrument settings config saved to: {instrument_path}") except ImportError: # Fallback to original function if new one doesn't exist yet from sp2xr.helpers import export_xr_ini_to_yaml export_xr_ini_to_yaml(ini_file, str(instrument_path)) # print(f"Instrument settings config saved to: {instrument_path}") except Exception as e: print(f"Warning: Could not convert INI to YAML: {e}") # Still reference the original INI file as fallback config["calibration_file"] = ini_file with open(schema_output, "w") as f: yaml.dump(config, f, sort_keys=False) print(f"Added INI file reference as fallback: {Path(ini_file).name}") def generate_mapping_config( pbp_file: Path, hk_file: Path, schema_output: str = "new_data_schema_with_mapping.yaml", ini_file: str = None, instrument_output: str = None, ) -> None: """Generate enhanced config schema with column mapping templates.""" print(f"Reading PbP schema from: {pbp_file}") pbp_schema = load_schema(pbp_file) print(f"Reading HK schema from: {hk_file}") hk_schema = load_schema(hk_file) # Get canonical schemas canonical_schemas = get_canonical_schemas() # Create column mapping templates pbp_mapping = {} hk_mapping = {} # For PbP: map file columns to canonical columns for canonical_col in canonical_schemas["pbp_canonical"]: matching_file_col = None for file_col in pbp_schema.keys(): if file_col.lower() == canonical_col.lower(): matching_file_col = file_col break pbp_mapping[canonical_col] = ( matching_file_col or canonical_col # Use canonical name as default ) # For HK: map file columns to canonical columns for canonical_col in canonical_schemas["hk_canonical"]: matching_file_col = None for file_col in hk_schema.keys(): if file_col.lower() == canonical_col.lower(): matching_file_col = file_col break hk_mapping[canonical_col] = ( matching_file_col or canonical_col # Use canonical name as default ) # Build enhanced config config = { "# INSTRUCTIONS": [ "This config file contains both schema definitions and column mappings.", "1. The *_schema sections define the data types for your input files.", "2. The *_column_mapping sections map your file columns to canonical names.", "3. Update column mappings if your files use different column names.", "4. If your file doesn't have a particular canonical column, set it to null or remove the line.", "5. The output parquet files will use the canonical column names for consistency.", ], "pbp_schema": pbp_schema, "hk_schema": hk_schema, "pbp_canonical_schema": canonical_schemas["pbp_canonical"], "hk_canonical_schema": canonical_schemas["hk_canonical"], "pbp_column_mapping": pbp_mapping, "hk_column_mapping": hk_mapping, } # Create output directory if it doesn't exist schema_path = Path(schema_output) if schema_path.parent != Path("."): schema_path.parent.mkdir(parents=True, exist_ok=True) print(f"Created config directory: {schema_path.parent}") with open(schema_output, "w") as f: yaml.dump(config, f, sort_keys=False, default_flow_style=False) print( f"Enhanced data schema config with column mapping saved to: {schema_path.absolute()}" ) # Generate separate instrument settings config from INI file if ini_file: # Determine instrument settings output filename if instrument_output: instrument_path = Path(instrument_output) else: instrument_path = schema_path.parent / "new_instrument_settings.yaml" try: from sp2xr.helpers import export_xr_ini_to_yaml_with_source export_xr_ini_to_yaml_with_source(ini_file, str(instrument_path)) # print(f"Instrument settings config saved to: {instrument_path}") except ImportError: # Fallback to original function if new one doesn't exist yet from sp2xr.helpers import export_xr_ini_to_yaml export_xr_ini_to_yaml(ini_file, str(instrument_path)) # print(f"Instrument settings config saved to: {instrument_path}") except Exception as e: print(f"Warning: Could not convert INI to YAML: {e}") # Still reference the original INI file as fallback config["calibration_file"] = ini_file with open(schema_output, "w") as f: yaml.dump(config, f, sort_keys=False, default_flow_style=False) print(f"Added INI file reference as fallback: {Path(ini_file).name}") def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Generate SP2XR configuration files from data directory", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Generate basic config schema from files in current directory python sp2xr_generate_config.py . # Generate config schema from specific directory python sp2xr_generate_config.py /path/to/sp2xr/data # Generate config schema with column mapping support python sp2xr_generate_config.py /path/to/data --mapping # Specify custom schema and instrument settings filenames python sp2xr_generate_config.py /path/to/data --schema-output my_schema.yaml --instrument-output my_settings.yaml # Generate mapping config with custom names python sp2xr_generate_config.py /path/to/data --mapping --schema-output campaign_schema.yaml --instrument-output campaign_settings.yaml """, ) parser.add_argument( "directory", help="Directory containing SP2XR files (PbP and HK files)" ) parser.add_argument( "--schema-output", "-s", default="config/new_data_schema.yaml", help="Output filename for data schema config (default: config/new_data_schema.yaml)", ) parser.add_argument( "--instrument-output", "-i", default=None, help="Output filename for instrument settings config (default: {schema_output}_instrument_settings.yaml)", ) parser.add_argument( "--mapping", "-m", action="store_true", help="Generate config with column mapping support (creates config_with_mapping.yaml)", ) parser.add_argument( "--pbp-file", help="Specify specific PbP file instead of auto-detection" ) parser.add_argument( "--hk-file", help="Specify specific HK file instead of auto-detection" ) return parser.parse_args() def main(): """Main entry point.""" args = parse_args() try: # Use specific files if provided, otherwise auto-detect if args.pbp_file and args.hk_file: pbp_file = Path(args.pbp_file) hk_file = Path(args.hk_file) if not pbp_file.exists(): raise FileNotFoundError(f"PbP file not found: {pbp_file}") if not hk_file.exists(): raise FileNotFoundError(f"HK file not found: {hk_file}") else: # Auto-detect files in directory print(f"Searching for SP2XR files in: {args.directory}") pbp_files, hk_files = find_sp2xr_files(args.directory) if not pbp_files: raise FileNotFoundError( "No PbP files found. Looking for files with 'PbP', 'pbp', or 'Pbp' in the name." ) if not hk_files: raise FileNotFoundError( "No HK files found. Looking for files with 'hk', 'HK', or 'Hk' in the name." ) # Use the first file found for each type pbp_file = pbp_files[0] hk_file = hk_files[0] print(f"Found {len(pbp_files)} PbP file(s), using: {pbp_file.name}") print(f"Found {len(hk_files)} HK file(s), using: {hk_file.name}") # Check for INI files in the directory try: from sp2xr.helpers import find_and_validate_ini_files ini_file = find_and_validate_ini_files(str(args.directory)) if ini_file: print(f"Found consistent INI calibration file: {Path(ini_file).name}") else: print("No INI calibration files found in directory") except ValueError as e: print(f"WARNING: {e}") print("You should process data with different calibrations separately.") except ImportError: print("Could not import INI validation function") # Generate configuration if args.mapping: schema_file = ( "config/new_data_schema_with_mapping.yaml" if args.schema_output == "config/new_data_schema.yaml" else args.schema_output ) generate_mapping_config( pbp_file, hk_file, schema_file, ini_file if "ini_file" in locals() else None, args.instrument_output, ) else: generate_basic_config( pbp_file, hk_file, args.schema_output, ini_file if "ini_file" in locals() else None, args.instrument_output, ) print("\nConfiguration generation completed successfully!") except Exception as e: print(f"Error: {e}") return 1 return 0 if __name__ == "__main__": exit(main())