cleanup config organization
This commit is contained in:
458
scripts/sp2xr_generate_config.py
Normal file
458
scripts/sp2xr_generate_config.py
Normal file
@@ -0,0 +1,458 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Generate SP2XR configuration files by automatically detecting PbP and HK files in a directory.
|
||||
|
||||
This script scans a directory for SP2XR data files (CSV/ZIP/Parquet) and generates
|
||||
configuration files with proper schemas and column mappings.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import pandas as pd
|
||||
import yaml
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
def infer_general_dtype(dtype: Any) -> str:
|
||||
"""Infer general data type from pandas dtype."""
|
||||
if pd.api.types.is_integer_dtype(dtype):
|
||||
return "int"
|
||||
elif pd.api.types.is_float_dtype(dtype):
|
||||
return "float"
|
||||
elif pd.api.types.is_datetime64_any_dtype(dtype):
|
||||
return "datetime"
|
||||
else:
|
||||
return "string"
|
||||
|
||||
|
||||
def find_sp2xr_files(directory: str | Path) -> tuple[list[Path], list[Path]]:
|
||||
"""
|
||||
Find PbP and HK files in the given directory and all subdirectories.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
directory : str | Path
|
||||
Directory to search for SP2XR files (searches recursively)
|
||||
|
||||
Returns
|
||||
-------
|
||||
tuple[list[Path], list[Path]]
|
||||
Lists of PbP files and HK files found
|
||||
"""
|
||||
directory = Path(directory)
|
||||
if not directory.exists():
|
||||
raise FileNotFoundError(f"Directory not found: {directory}")
|
||||
|
||||
# Common SP2XR file patterns
|
||||
pbp_patterns = ["*PbP*", "*pbp*", "*Pbp*"]
|
||||
hk_patterns = ["*hk*", "*HK*", "*Hk*"]
|
||||
file_extensions = ["*.csv", "*.zip", "*.parquet"]
|
||||
|
||||
pbp_files = []
|
||||
hk_files = []
|
||||
|
||||
# Search for files matching patterns (including subdirectories)
|
||||
for ext in file_extensions:
|
||||
for pattern in pbp_patterns:
|
||||
pbp_files.extend(directory.glob(f"**/{pattern}{ext}"))
|
||||
for pattern in hk_patterns:
|
||||
hk_files.extend(directory.glob(f"**/{pattern}{ext}"))
|
||||
|
||||
# Remove duplicates and sort
|
||||
pbp_files = sorted(list(set(pbp_files)))
|
||||
hk_files = sorted(list(set(hk_files)))
|
||||
|
||||
return pbp_files, hk_files
|
||||
|
||||
|
||||
def load_schema(input_file: str | Path, nrows: int = 100) -> dict[str, str]:
|
||||
"""
|
||||
Load schema from input file by inferring column types.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
input_file : str | Path
|
||||
Path to the input file
|
||||
nrows : int
|
||||
Number of rows to read for type inference (for CSV files)
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict[str, str]
|
||||
Mapping of column names to data types
|
||||
"""
|
||||
ext = os.path.splitext(str(input_file))[1].lower()
|
||||
|
||||
if ext in [".csv", ".zip"]:
|
||||
df = pd.read_csv(input_file, nrows=nrows)
|
||||
elif ext == ".parquet":
|
||||
# For parquet, we can just read the schema without loading data
|
||||
pf = pd.read_parquet(input_file, engine="pyarrow")
|
||||
df = pf.head(0) # Empty dataframe with schema
|
||||
else:
|
||||
raise ValueError(f"Unsupported file format: {ext}")
|
||||
|
||||
schema = {col: infer_general_dtype(dtype) for col, dtype in df.dtypes.items()}
|
||||
return schema
|
||||
|
||||
|
||||
def get_canonical_schemas() -> dict[str, dict[str, str]]:
|
||||
"""Return canonical column schemas for SP2XR data."""
|
||||
pbp_canonical = {
|
||||
"Time (sec)": "float",
|
||||
"Packet Time Stamp": "float",
|
||||
"Flag": "float",
|
||||
"Dropped Records": "float",
|
||||
"Record Count": "float",
|
||||
"Record Size": "float",
|
||||
"Particle Time Stamp": "float",
|
||||
"Particle Flags": "float",
|
||||
"Scatter relPeak": "float",
|
||||
"Scatter Transit Time": "float",
|
||||
"Scatter Peak Time": "float",
|
||||
"Scatter FWHM": "float",
|
||||
"Scatter Size (nm)": "float",
|
||||
"Incand relPeak": "float",
|
||||
"Incand Transit Time": "float",
|
||||
"Incand Peak Time": "float",
|
||||
"Incand FWHM": "float",
|
||||
"Incand Delay": "float",
|
||||
"Incand Mass (fg)": "float",
|
||||
"Reserved": "float",
|
||||
}
|
||||
|
||||
hk_canonical = {
|
||||
"Time Stamp": "datetime",
|
||||
"Time (sec)": "float",
|
||||
"Sample Flow Controller Read (sccm)": "float",
|
||||
"Sample Flow Controller Read (vccm)": "float",
|
||||
"Time Stamp (UTC sec)": "float",
|
||||
"Elapsed Time": "float",
|
||||
"Error Code": "float",
|
||||
"Packet Time Stamp": "float",
|
||||
"Laser TEC Temp (C)": "float",
|
||||
"Crystal TEC Temp (C)": "float",
|
||||
"Inlet Air Temp (C)": "float",
|
||||
"Computer Heatsink Temp (C)": "float",
|
||||
"Laser Heatsink Temp (C)": "float",
|
||||
"Outlet Air Temp (C)": "float",
|
||||
"YAG Output Monitor (V)": "float",
|
||||
"Cavity Pressure (hPa)": "float",
|
||||
"Laser Driver Power Monitor (uA)": "float",
|
||||
"Laser Driver Current Limit Monitor (A)": "float",
|
||||
"Laser Driver Current Monitor (A)": "float",
|
||||
}
|
||||
|
||||
return {"pbp_canonical": pbp_canonical, "hk_canonical": hk_canonical}
|
||||
|
||||
|
||||
def generate_basic_config(
|
||||
pbp_file: Path,
|
||||
hk_file: Path,
|
||||
schema_output: str = "config_schema.yaml",
|
||||
ini_file: str = None,
|
||||
instrument_output: str = None,
|
||||
) -> None:
|
||||
"""Generate basic config schema file with data type definitions only."""
|
||||
print(f"Reading PbP schema from: {pbp_file}")
|
||||
pbp_schema = load_schema(pbp_file)
|
||||
|
||||
print(f"Reading HK schema from: {hk_file}")
|
||||
hk_schema = load_schema(hk_file)
|
||||
|
||||
config = {
|
||||
"pbp_schema": pbp_schema,
|
||||
"hk_schema": hk_schema,
|
||||
}
|
||||
|
||||
# Create output directory if it doesn't exist
|
||||
schema_path = Path(schema_output)
|
||||
schema_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with open(schema_output, "w") as f:
|
||||
yaml.dump(config, f, sort_keys=False)
|
||||
|
||||
print(f"Data schema config saved to: {schema_output}")
|
||||
|
||||
# Generate separate instrument settings config from INI file
|
||||
if ini_file:
|
||||
# Determine instrument settings output filename
|
||||
if instrument_output:
|
||||
instrument_path = Path(instrument_output)
|
||||
else:
|
||||
instrument_path = (
|
||||
schema_path.parent / f"{schema_path.stem}_instrument_settings.yaml"
|
||||
)
|
||||
|
||||
try:
|
||||
from sp2xr.helpers import export_xr_ini_to_yaml_with_source
|
||||
|
||||
export_xr_ini_to_yaml_with_source(ini_file, str(instrument_path))
|
||||
print(f"Instrument settings config saved to: {instrument_path}")
|
||||
except ImportError:
|
||||
# Fallback to original function if new one doesn't exist yet
|
||||
from sp2xr.helpers import export_xr_ini_to_yaml
|
||||
|
||||
export_xr_ini_to_yaml(ini_file, str(instrument_path))
|
||||
print(f"Instrument settings config saved to: {instrument_path}")
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not convert INI to YAML: {e}")
|
||||
# Still reference the original INI file as fallback
|
||||
config["calibration_file"] = ini_file
|
||||
with open(schema_output, "w") as f:
|
||||
yaml.dump(config, f, sort_keys=False)
|
||||
print(f"Added INI file reference as fallback: {Path(ini_file).name}")
|
||||
|
||||
|
||||
def generate_mapping_config(
|
||||
pbp_file: Path,
|
||||
hk_file: Path,
|
||||
schema_output: str = "config_schema_with_mapping.yaml",
|
||||
ini_file: str = None,
|
||||
instrument_output: str = None,
|
||||
) -> None:
|
||||
"""Generate enhanced config schema with column mapping templates."""
|
||||
print(f"Reading PbP schema from: {pbp_file}")
|
||||
pbp_schema = load_schema(pbp_file)
|
||||
|
||||
print(f"Reading HK schema from: {hk_file}")
|
||||
hk_schema = load_schema(hk_file)
|
||||
|
||||
# Get canonical schemas
|
||||
canonical_schemas = get_canonical_schemas()
|
||||
|
||||
# Create column mapping templates
|
||||
pbp_mapping = {}
|
||||
hk_mapping = {}
|
||||
|
||||
# For PbP: map file columns to canonical columns
|
||||
for canonical_col in canonical_schemas["pbp_canonical"]:
|
||||
matching_file_col = None
|
||||
for file_col in pbp_schema.keys():
|
||||
if file_col.lower() == canonical_col.lower():
|
||||
matching_file_col = file_col
|
||||
break
|
||||
|
||||
pbp_mapping[canonical_col] = (
|
||||
matching_file_col or canonical_col # Use canonical name as default
|
||||
)
|
||||
|
||||
# For HK: map file columns to canonical columns
|
||||
for canonical_col in canonical_schemas["hk_canonical"]:
|
||||
matching_file_col = None
|
||||
for file_col in hk_schema.keys():
|
||||
if file_col.lower() == canonical_col.lower():
|
||||
matching_file_col = file_col
|
||||
break
|
||||
|
||||
hk_mapping[canonical_col] = (
|
||||
matching_file_col or canonical_col # Use canonical name as default
|
||||
)
|
||||
|
||||
# Build enhanced config
|
||||
config = {
|
||||
"# INSTRUCTIONS": [
|
||||
"This config file contains both schema definitions and column mappings.",
|
||||
"1. The *_schema sections define the data types for your input files.",
|
||||
"2. The *_column_mapping sections map your file columns to canonical names.",
|
||||
"3. Update column mappings if your files use different column names.",
|
||||
"4. If your file doesn't have a particular canonical column, set it to null or remove the line.",
|
||||
"5. The output parquet files will use the canonical column names for consistency.",
|
||||
],
|
||||
"pbp_schema": pbp_schema,
|
||||
"hk_schema": hk_schema,
|
||||
"pbp_canonical_schema": canonical_schemas["pbp_canonical"],
|
||||
"hk_canonical_schema": canonical_schemas["hk_canonical"],
|
||||
"pbp_column_mapping": pbp_mapping,
|
||||
"hk_column_mapping": hk_mapping,
|
||||
}
|
||||
|
||||
# Create output directory if it doesn't exist
|
||||
schema_path = Path(schema_output)
|
||||
schema_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with open(schema_output, "w") as f:
|
||||
yaml.dump(config, f, sort_keys=False, default_flow_style=False)
|
||||
|
||||
print(f"Enhanced data schema config with column mapping saved to: {schema_output}")
|
||||
|
||||
# Generate separate instrument settings config from INI file
|
||||
if ini_file:
|
||||
# Determine instrument settings output filename
|
||||
if instrument_output:
|
||||
instrument_path = Path(instrument_output)
|
||||
else:
|
||||
instrument_path = (
|
||||
schema_path.parent / f"{schema_path.stem}_instrument_settings.yaml"
|
||||
)
|
||||
|
||||
try:
|
||||
from sp2xr.helpers import export_xr_ini_to_yaml_with_source
|
||||
|
||||
export_xr_ini_to_yaml_with_source(ini_file, str(instrument_path))
|
||||
print(f"Instrument settings config saved to: {instrument_path}")
|
||||
except ImportError:
|
||||
# Fallback to original function if new one doesn't exist yet
|
||||
from sp2xr.helpers import export_xr_ini_to_yaml
|
||||
|
||||
export_xr_ini_to_yaml(ini_file, str(instrument_path))
|
||||
print(f"Instrument settings config saved to: {instrument_path}")
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not convert INI to YAML: {e}")
|
||||
# Still reference the original INI file as fallback
|
||||
config["calibration_file"] = ini_file
|
||||
with open(schema_output, "w") as f:
|
||||
yaml.dump(config, f, sort_keys=False, default_flow_style=False)
|
||||
print(f"Added INI file reference as fallback: {Path(ini_file).name}")
|
||||
|
||||
|
||||
def parse_args():
|
||||
"""Parse command line arguments."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Generate SP2XR configuration files from data directory",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
# Generate basic config schema from files in current directory
|
||||
python sp2xr_generate_config.py .
|
||||
|
||||
# Generate config schema from specific directory
|
||||
python sp2xr_generate_config.py /path/to/sp2xr/data
|
||||
|
||||
# Generate config schema with column mapping support
|
||||
python sp2xr_generate_config.py /path/to/data --mapping
|
||||
|
||||
# Specify custom schema and instrument settings filenames
|
||||
python sp2xr_generate_config.py /path/to/data --schema-output my_schema.yaml --instrument-output my_settings.yaml
|
||||
|
||||
# Generate mapping config with custom names
|
||||
python sp2xr_generate_config.py /path/to/data --mapping --schema-output campaign_schema.yaml --instrument-output campaign_settings.yaml
|
||||
""",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"directory", help="Directory containing SP2XR files (PbP and HK files)"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--schema-output",
|
||||
"-s",
|
||||
default="config_schema.yaml",
|
||||
help="Output filename for data schema config (default: config_schema.yaml)",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--instrument-output",
|
||||
"-i",
|
||||
default=None,
|
||||
help="Output filename for instrument settings config (default: {schema_output}_instrument_settings.yaml)",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--mapping",
|
||||
"-m",
|
||||
action="store_true",
|
||||
help="Generate config with column mapping support (creates config_with_mapping.yaml)",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--pbp-file", help="Specify specific PbP file instead of auto-detection"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--hk-file", help="Specify specific HK file instead of auto-detection"
|
||||
)
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point."""
|
||||
args = parse_args()
|
||||
|
||||
try:
|
||||
# Use specific files if provided, otherwise auto-detect
|
||||
if args.pbp_file and args.hk_file:
|
||||
pbp_file = Path(args.pbp_file)
|
||||
hk_file = Path(args.hk_file)
|
||||
|
||||
if not pbp_file.exists():
|
||||
raise FileNotFoundError(f"PbP file not found: {pbp_file}")
|
||||
if not hk_file.exists():
|
||||
raise FileNotFoundError(f"HK file not found: {hk_file}")
|
||||
|
||||
else:
|
||||
# Auto-detect files in directory
|
||||
print(f"Searching for SP2XR files in: {args.directory}")
|
||||
pbp_files, hk_files = find_sp2xr_files(args.directory)
|
||||
|
||||
if not pbp_files:
|
||||
raise FileNotFoundError(
|
||||
"No PbP files found. Looking for files with 'PbP', 'pbp', or 'Pbp' in the name."
|
||||
)
|
||||
if not hk_files:
|
||||
raise FileNotFoundError(
|
||||
"No HK files found. Looking for files with 'hk', 'HK', or 'Hk' in the name."
|
||||
)
|
||||
|
||||
# Use the first file found for each type
|
||||
pbp_file = pbp_files[0]
|
||||
hk_file = hk_files[0]
|
||||
|
||||
print(f"Found {len(pbp_files)} PbP file(s), using: {pbp_file.name}")
|
||||
print(f"Found {len(hk_files)} HK file(s), using: {hk_file.name}")
|
||||
|
||||
# Check for INI files in the directory
|
||||
try:
|
||||
from sp2xr.helpers import find_and_validate_ini_files
|
||||
|
||||
ini_file = find_and_validate_ini_files(str(args.directory))
|
||||
|
||||
if ini_file:
|
||||
print(f"Found consistent INI calibration file: {Path(ini_file).name}")
|
||||
else:
|
||||
print("No INI calibration files found in directory")
|
||||
|
||||
except ValueError as e:
|
||||
print(f"WARNING: {e}")
|
||||
print("You should process data with different calibrations separately.")
|
||||
except ImportError:
|
||||
print("Could not import INI validation function")
|
||||
|
||||
# Generate configuration
|
||||
if args.mapping:
|
||||
schema_file = (
|
||||
"config_schema_with_mapping.yaml"
|
||||
if args.schema_output == "config_schema.yaml"
|
||||
else args.schema_output
|
||||
)
|
||||
generate_mapping_config(
|
||||
pbp_file,
|
||||
hk_file,
|
||||
schema_file,
|
||||
ini_file if "ini_file" in locals() else None,
|
||||
args.instrument_output,
|
||||
)
|
||||
else:
|
||||
generate_basic_config(
|
||||
pbp_file,
|
||||
hk_file,
|
||||
args.schema_output,
|
||||
ini_file if "ini_file" in locals() else None,
|
||||
args.instrument_output,
|
||||
)
|
||||
|
||||
print("\nConfiguration generation completed successfully!")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
||||
Reference in New Issue
Block a user